Node location cache: off-heap storage and “array” implementation that supports parallel inserts (#131)

* Add --nodemap-type=array option for 2-3x faster osm pass 1 imports
* Add --nodemap-storage=direct option to experiment with direct (off-heap) memory usage
* Extract ResourceUsage and OsmPhaser utilities
pull/140/head
Michael Barry 2022-03-19 05:46:03 -04:00 zatwierdzone przez GitHub
rodzic ba7b861a8f
commit bf081692ce
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 4AEE18F83AFDEB23
53 zmienionych plików z 3932 dodań i 824 usunięć

Wyświetl plik

@ -123,8 +123,8 @@ Some example runtimes (excluding downloading resources):
| s3://osm-pds/2021/planet-211011.osm.pbf (65GB) | Basemap | DO 16cpu 128GB | 3h9m cpu:42h1m avg:13.3 | 99GB | [logs](planet-logs/v0.1.0-planet-do-16cpu-128gb.txt), [VisualVM Profile](planet-logs/v0.1.0-planet-do-16cpu-128gb.nps) |
| [Daylight Distribution v1.6](https://daylightmap.org/2021/09/29/daylight-v16-released.html) with ML buildings and admin boundaries (67GB) | Basemap | DO 16cpu 128GB | 3h13m cpu:43h40m avg:13.5 | 101GB | [logs](planet-logs/v0.1.0-daylight-do-16cpu-128gb.txt) |
| s3://osm-pds/2021/planet-211011.osm.pbf (65GB) | Basemap (without z13 building merge) | Linode 50cpu 128GB | 1h9m cpu:24h36m avg:21.2 | 97GB | [logs](planet-logs/v0.1.0-planet-linode-50cpu-128gb.txt), [VisualVM Profile](planet-logs/v0.1.0-planet-linode-50cpu-128gb.nps) |
| s3://osm-pds/2021/planet-211011.osm.pbf (65GB) | Basemap (without z13 building merge) | c5ad.16xlarge (64cpu/128GB) | 59m cpu:27h6m avg:27.4 | 97GB | [logs](planet-logs/v0.1.0-planet-c5ad-64cpu-128gb.txt) |
| s3://osm-pds/2021/planet-220214.osm.pbf (67GB) | Basemap v0.3.0 (without z13 building merge) | r6g.16xlarge (64cpu/512GB) with ramdisk and write to EFS | 1h1m cpu:24h33m avg:24.3 | 104GB | [logs](planet-logs/v0.3.0-planet-r6g-64cpu-512gb-ramdisk.txt) |
| s3://osm-pds/2021/planet-220307.osm.pbf (67GB) | Basemap v0.3.0 (without z13 building merge) | c5ad.16xlarge (64cpu/128GB) | 47m cpu:26h53m avg:34.2 | 97GB | [logs](planet-logs/v0.3.0-planet-c5ad-128gb.txt) |
## Alternatives

File diff suppressed because one or more lines are too long

Wyświetl plik

@ -151,7 +151,9 @@ public class TransportationName implements
@Override
public void preprocessOsmNode(OsmElement.Node node) {
if (node.hasTag("highway", "motorway_junction")) {
motorwayJunctionHighwayClasses.put(node.id(), HighwayClass.UNKNOWN.value);
synchronized (motorwayJunctionHighwayClasses) {
motorwayJunctionHighwayClasses.put(node.id(), HighwayClass.UNKNOWN.value);
}
}
}
@ -162,13 +164,15 @@ public class TransportationName implements
HighwayClass cls = HighwayClass.from(highway);
if (cls != HighwayClass.UNKNOWN) {
LongArrayList nodes = way.nodes();
for (int i = 0; i < nodes.size(); i++) {
long node = nodes.get(i);
if (motorwayJunctionHighwayClasses.containsKey(node)) {
byte oldValue = motorwayJunctionHighwayClasses.get(node);
byte newValue = cls.value;
if (newValue > oldValue) {
motorwayJunctionHighwayClasses.put(node, newValue);
synchronized (motorwayJunctionHighwayClasses) {
for (int i = 0; i < nodes.size(); i++) {
long node = nodes.get(i);
if (motorwayJunctionHighwayClasses.containsKey(node)) {
byte oldValue = motorwayJunctionHighwayClasses.get(node);
byte newValue = cls.value;
if (newValue > oldValue) {
motorwayJunctionHighwayClasses.put(node, newValue);
}
}
}
}

Wyświetl plik

@ -145,7 +145,9 @@ public class Waterway implements
@Override
public List<OsmRelationInfo> preprocessOsmRelation(OsmElement.Relation relation) {
if (relation.hasTag("waterway", "river") && !Utils.nullOrEmpty(relation.getString("name"))) {
riverRelationLengths.put(relation.id(), new AtomicDouble());
synchronized (riverRelationLengths) {
riverRelationLengths.put(relation.id(), new AtomicDouble());
}
return List.of(new WaterwayRelation(relation.id(), LanguageUtils.getNames(relation.tags(), translations)));
}
return null;

Wyświetl plik

@ -41,15 +41,18 @@ public class LongLongMapBench {
.addProcessStats();
AtomicReference<String> writeRate = new AtomicReference<>();
new Worker("writer", Stats.inMemory(), 1, () -> {
long start = System.nanoTime();
for (long i = 0; i < entries; i++) {
map.put(i + 1L, i + 2L);
counter.count = i;
try (var writer = map.newWriter()) {
long start = System.nanoTime();
for (long i = 0; i < entries; i++) {
writer.put(i + 1L, i + 2L);
counter.count = i;
}
long end = System.nanoTime();
String rate = format.numeric(entries * NANOSECONDS_PER_SECOND / (end - start), false) + "/s";
System.err.println(
"Loaded " + entries + " in " + Duration.ofNanos(end - start).toSeconds() + "s (" + rate + ")");
writeRate.set(rate);
}
long end = System.nanoTime();
String rate = format.numeric(entries * NANOSECONDS_PER_SECOND / (end - start), false) + "/s";
System.err.println("Loaded " + entries + " in " + Duration.ofNanos(end - start).toSeconds() + "s (" + rate + ")");
writeRate.set(rate);
}).awaitAndLog(loggers, Duration.ofSeconds(10));
map.get(1);

Wyświetl plik

@ -13,23 +13,20 @@ import com.onthegomap.planetiler.reader.osm.OsmReader;
import com.onthegomap.planetiler.stats.ProcessInfo;
import com.onthegomap.planetiler.stats.Stats;
import com.onthegomap.planetiler.stats.Timers;
import com.onthegomap.planetiler.util.ByteBufferUtil;
import com.onthegomap.planetiler.util.Downloader;
import com.onthegomap.planetiler.util.FileUtils;
import com.onthegomap.planetiler.util.Format;
import com.onthegomap.planetiler.util.Geofabrik;
import com.onthegomap.planetiler.util.LogUtil;
import com.onthegomap.planetiler.util.ResourceUsage;
import com.onthegomap.planetiler.util.Translations;
import com.onthegomap.planetiler.util.Wikidata;
import com.onthegomap.planetiler.worker.RunnableThatThrows;
import java.io.IOException;
import java.nio.file.FileStore;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -162,6 +159,10 @@ public class Planetiler {
Path path = getPath(name, "OSM input file", defaultPath, defaultUrl);
var thisInputFile = new OsmInputFile(path, config.osmLazyReads());
osmInputFile = thisInputFile;
// fail fast if there is some issue with madvise on this system
if (config.nodeMapMadvise()) {
ByteBufferUtil.init();
}
return appendStage(new Stage(
name,
List.of(
@ -480,6 +481,9 @@ public class Planetiler {
LOGGER.info(" mbtiles: Encode each tile and write to " + output);
}
// in case any temp files are left from a previous run...
FileUtils.delete(tmpDir);
if (!toDownload.isEmpty()) {
download();
}
@ -533,74 +537,57 @@ public class Planetiler {
}
private void checkDiskSpace() {
Map<FileStore, Long> readPhaseBytes = new HashMap<>();
Map<FileStore, Long> writePhaseBytes = new HashMap<>();
ResourceUsage readPhase = new ResourceUsage("read phase disk");
ResourceUsage writePhase = new ResourceUsage("write phase disk");
long osmSize = osmInputFile.diskUsageBytes();
long nodeMapSize = LongLongMap.estimateDiskUsage(config.nodeMapType(), config.nodeMapStorage(), osmSize);
long nodeMapSize =
LongLongMap.estimateStorageRequired(config.nodeMapType(), config.nodeMapStorage(), osmSize, tmpDir).diskUsage();
long featureSize = profile.estimateIntermediateDiskBytes(osmSize);
long outputSize = profile.estimateOutputBytes(osmSize);
try {
// node locations only needed while reading inputs
readPhaseBytes.merge(Files.getFileStore(tmpDir), nodeMapSize, Long::sum);
// feature db persists across read/write phase
readPhaseBytes.merge(Files.getFileStore(tmpDir), featureSize, Long::sum);
writePhaseBytes.merge(Files.getFileStore(tmpDir), featureSize, Long::sum);
// output only needed during write phase
writePhaseBytes.merge(Files.getFileStore(output.toAbsolutePath().getParent()), outputSize, Long::sum);
// if the user opts to remove an input source after reading to free up additional space for the output...
for (var input : inputPaths) {
if (input.freeAfterReading()) {
writePhaseBytes.merge(Files.getFileStore(input.path), -Files.size(input.path), Long::sum);
}
}
checkDiskSpaceOnDevices(readPhaseBytes, "read");
checkDiskSpaceOnDevices(writePhaseBytes, "write");
} catch (IOException e) {
LOGGER.warn("Unable to check disk space requirements, may run out of room " + e);
}
}
private void checkDiskSpaceOnDevices(Map<FileStore, Long> readPhaseBytes, String phase) throws IOException {
for (var entry : readPhaseBytes.entrySet()) {
var fs = entry.getKey();
var requested = entry.getValue();
long available = fs.getUnallocatedSpace();
if (available < requested) {
var format = Format.defaultInstance();
String warning =
"Planetiler needs ~" + format.storage(requested) + " on " + fs + " during " + phase +
" phase, which only has " + format.storage(available) + " available";
if (config.force() || requested < available * 1.25) {
LOGGER.warn(warning + ", may fail.");
} else {
throw new IllegalArgumentException(warning + ", use the --force argument to continue anyway.");
}
// node locations only needed while reading inputs
readPhase.addDisk(tmpDir, nodeMapSize, "temporary node location cache");
// feature db persists across read/write phase
readPhase.addDisk(tmpDir, featureSize, "temporary feature storage");
writePhase.addDisk(tmpDir, featureSize, "temporary feature storage");
// output only needed during write phase
writePhase.addDisk(output, outputSize, "mbtiles output");
// if the user opts to remove an input source after reading to free up additional space for the output...
for (var input : inputPaths) {
if (input.freeAfterReading()) {
writePhase.addDisk(input.path, -FileUtils.size(input.path), "delete " + input.id + " source after reading");
}
}
readPhase.checkAgainstLimits(config.force(), true);
writePhase.checkAgainstLimits(config.force(), true);
}
private void checkMemory() {
var format = Format.defaultInstance();
long nodeMap = LongLongMap.estimateMemoryUsage(config.nodeMapType(), config.nodeMapStorage(),
osmInputFile.diskUsageBytes());
long profile = profile().estimateRamRequired(osmInputFile.diskUsageBytes());
long requested = nodeMap + profile;
long jvmMemory = ProcessInfo.getMaxMemoryBytes();
Format format = Format.defaultInstance();
ResourceUsage check = new ResourceUsage("read phase");
ResourceUsage nodeMapUsages = LongLongMap.estimateStorageRequired(config.nodeMapType(), config.nodeMapStorage(),
osmInputFile.diskUsageBytes(), tmpDir);
long nodeMapDiskUsage = nodeMapUsages.diskUsage();
if (jvmMemory < requested) {
String warning =
"Planetiler needs ~" + format.storage(requested) + " memory for the JVM, but only " +
format.storage(jvmMemory) + " is available, try setting -Xmx=" + format.storage(requested).toLowerCase(
Locale.ROOT);
if (config.force() || requested < jvmMemory * 1.25) {
LOGGER.warn(warning + ", may fail.");
} else {
throw new IllegalArgumentException(warning + ", use the --force argument to continue anyway.");
check.addAll(nodeMapUsages)
.addMemory(profile().estimateRamRequired(osmInputFile.diskUsageBytes()), "temporary profile storage");
check.checkAgainstLimits(config().force(), true);
// check off-heap memory if we can get it
ProcessInfo.getSystemFreeMemoryBytes().ifPresent(extraMemory -> {
if (extraMemory < nodeMapDiskUsage) {
LOGGER.warn("""
Planetiler will use a ~%s memory-mapped file for node locations but the OS only has %s available
to cache pages, this may slow the import down. To speed up, run on a machine with more memory or
reduce the -Xmx setting.
""".formatted(
format.storage(nodeMapDiskUsage),
format.storage(extraMemory)
));
}
}
});
}
public Arguments arguments() {

Wyświetl plik

@ -10,8 +10,8 @@ import java.util.stream.Stream;
/**
* A large array of primitives. A single thread appends all elements then allows random access from multiple threads.
* <p>
* {@link AppendStoreRam} stores all data in arrays in RAM and {@link AppendStoreMmap} stores all data in a
* memory-mapped file.
* {@link AppendStoreRam} stores all data in {@link java.nio.ByteBuffer ByteBuffers} in RAM and {@link AppendStoreMmap}
* stores all data in a memory-mapped file.
*/
interface AppendStore extends Closeable, MemoryEstimator.HasEstimate, DiskBacked {
@ -37,6 +37,14 @@ interface AppendStore extends Closeable, MemoryEstimator.HasEstimate, DiskBacked
/** An array of ints. */
interface Ints extends AppendStore {
static Ints create(Storage storage, Storage.Params params) {
return switch (storage) {
case DIRECT -> new AppendStoreRam.Ints(true, params);
case RAM -> new AppendStoreRam.Ints(false, params);
case MMAP -> new AppendStoreMmap.Ints(params);
};
}
void appendInt(int value);
int getInt(long index);
@ -45,6 +53,14 @@ interface AppendStore extends Closeable, MemoryEstimator.HasEstimate, DiskBacked
/** An array of longs. */
interface Longs extends AppendStore {
static Longs create(Storage storage, Storage.Params params) {
return switch (storage) {
case DIRECT -> new AppendStoreRam.Longs(true, params);
case RAM -> new AppendStoreRam.Longs(false, params);
case MMAP -> new AppendStoreMmap.Longs(params);
};
}
void appendLong(long value);
long getLong(long index);

Wyświetl plik

@ -1,10 +1,11 @@
package com.onthegomap.planetiler.collection;
import com.onthegomap.planetiler.util.ByteBufferUtil;
import com.onthegomap.planetiler.util.FileUtils;
import com.onthegomap.planetiler.util.MmapUtil;
import java.io.BufferedOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
@ -32,15 +33,12 @@ abstract class AppendStoreMmap implements AppendStore {
private volatile MappedByteBuffer[] segments;
private volatile FileChannel channel;
static {
MmapUtil.init();
}
AppendStoreMmap(Path path, boolean madvise) {
this(path, 1 << 30, madvise); // 1GB
}
AppendStoreMmap(Path path, long segmentSizeBytes, boolean madvise) {
FileUtils.createParentDirectories(path);
this.madvise = madvise;
segmentBits = (int) (Math.log(segmentSizeBytes) / Math.log(2));
segmentMask = (1L << segmentBits) - 1;
@ -57,42 +55,21 @@ abstract class AppendStoreMmap implements AppendStore {
}
MappedByteBuffer[] getSegments() {
MappedByteBuffer[] result = segments;
if (result == null) {
if (segments == null) {
synchronized (this) {
if ((result = segments) == null) {
if (segments == null) {
try {
boolean madviseFailed = false;
// prepare the memory mapped file: stop writing, start reading
outputStream.close();
channel = FileChannel.open(path, StandardOpenOption.READ);
int segmentCount = (int) (outIdx / segmentBytes + 1);
result = new MappedByteBuffer[segmentCount];
int i = 0;
for (long segmentStart = 0; segmentStart < outIdx; segmentStart += segmentBytes) {
long segmentEnd = Math.min(segmentBytes, outIdx - segmentStart);
MappedByteBuffer thisBuffer = channel.map(FileChannel.MapMode.READ_ONLY, segmentStart, segmentEnd);
if (madvise) {
try {
MmapUtil.madvise(thisBuffer, MmapUtil.Madvice.RANDOM);
} catch (IOException e) {
if (!madviseFailed) { // log once
LOGGER.info(
"madvise not available on this system - node location lookup may be slower when less free RAM is available outside the JVM");
madviseFailed = true;
}
}
}
result[i++] = thisBuffer;
}
segments = result;
segments = ByteBufferUtil.mapFile(channel, outIdx, segmentBytes, madvise);
} catch (IOException e) {
throw new IllegalStateException("Failed preparing SequentialWriteRandomReadFile for reads", e);
throw new UncheckedIOException(e);
}
}
}
}
return result;
return segments;
}
@Override
@ -104,7 +81,7 @@ abstract class AppendStoreMmap implements AppendStore {
}
if (segments != null) {
try {
MmapUtil.unmap(segments);
ByteBufferUtil.free(segments);
} catch (IOException e) {
LOGGER.info("Unable to unmap " + path + " " + e);
}
@ -120,6 +97,10 @@ abstract class AppendStoreMmap implements AppendStore {
static class Ints extends AppendStoreMmap implements AppendStore.Ints {
Ints(Storage.Params params) {
this(params.path(), params.madvise());
}
Ints(Path path, boolean madvise) {
super(path, madvise);
}
@ -156,6 +137,10 @@ abstract class AppendStoreMmap implements AppendStore {
static class Longs extends AppendStoreMmap implements AppendStore.Longs {
Longs(Storage.Params params) {
this(params.path(), params.madvise());
}
Longs(Path path, boolean madvise) {
super(path, madvise);
}

Wyświetl plik

@ -1,49 +1,53 @@
package com.onthegomap.planetiler.collection;
import static com.onthegomap.planetiler.util.MemoryEstimator.POINTER_BYTES;
import static com.onthegomap.planetiler.util.MemoryEstimator.estimateByteArraySize;
import static com.onthegomap.planetiler.util.MemoryEstimator.estimateIntArraySize;
import static com.onthegomap.planetiler.util.MemoryEstimator.estimateLongArraySize;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
/**
* An array of primitives backed by arrays in RAM.
*
* @param <T> the primitive array type (i.e. {@code int[]} or {@code long[]})
* An array of primitives backed by regular or direct (native) {@link ByteBuffer ByteBuffers}.
* <p>
* Pass {@code direct=true} into constructors to use {@link ByteBuffer#allocateDirect(int)}, or false to use
* {@link ByteBuffer#allocate(int)}.
*/
abstract class AppendStoreRam<T> implements AppendStore {
abstract class AppendStoreRam implements AppendStore {
final List<T> arrays;
long size = 0;
final List<ByteBuffer> arrays;
private final boolean direct;
long writeOffset = 0;
final int slabSize;
final int slabBits;
final long slabMask;
AppendStoreRam(int segmentSizeBytes) {
AppendStoreRam(boolean direct, int segmentSizeBytes) {
this.direct = direct;
this.slabBits = (int) (Math.log(segmentSizeBytes) / Math.log(2));
if (1 << slabBits != segmentSizeBytes) {
throw new IllegalArgumentException("Segment size must be a power of 2: " + segmentSizeBytes);
}
if (segmentSizeBytes % 8 != 0) {
throw new IllegalStateException("Segment size must be a multiple of 8: " + segmentSizeBytes);
}
this.slabSize = (1 << slabBits);
this.slabMask = slabSize - 1;
this.arrays = new ArrayList<>();
}
/** Returns the slab that the next value should be written to. */
T getSlabForWrite() {
int slabIdx = (int) (size >>> slabBits);
ByteBuffer getSlabForWrite() {
int slabIdx = (int) (writeOffset >>> slabBits);
while (arrays.size() <= slabIdx) {
arrays.add(newSlab());
}
return arrays.get(slabIdx);
}
abstract T newSlab();
@Override
public long size() {
return size;
ByteBuffer newSlab() {
return direct ? ByteBuffer.allocateDirect(slabSize) : ByteBuffer.allocate(slabSize);
}
@Override
@ -51,35 +55,41 @@ abstract class AppendStoreRam<T> implements AppendStore {
arrays.clear();
}
static class Ints extends AppendStoreRam<int[]> implements AppendStore.Ints {
static class Ints extends AppendStoreRam implements AppendStore.Ints {
Ints() {
this(1 << 20); // 1MB
Ints(boolean direct, Storage.Params params) {
this(direct);
}
Ints(int segmentSizeBytes) {
super(segmentSizeBytes >>> 2);
Ints(boolean direct) {
this(direct, 1 << 20); // 1MB
}
@Override
int[] newSlab() {
return new int[slabSize];
Ints(boolean direct, int segmentSizeBytes) {
super(direct, segmentSizeBytes);
}
@Override
public void appendInt(int value) {
int offset = (int) (size & slabMask);
getSlabForWrite()[offset] = value;
size++;
int offset = (int) (this.writeOffset & slabMask);
getSlabForWrite().putInt(offset, value);
this.writeOffset += Integer.BYTES;
}
@Override
public int getInt(long index) {
checkIndexInBounds(index);
int slabIdx = (int) (index >>> slabBits);
int offset = (int) (index & slabMask);
int[] slab = arrays.get(slabIdx);
return slab[offset];
long byteIndex = index << 2;
if (byteIndex >= writeOffset) {
throw new IndexOutOfBoundsException("index: " + index + " size: " + size());
}
int slabIdx = (int) (byteIndex >>> slabBits);
int offset = (int) (byteIndex & slabMask);
return arrays.get(slabIdx).getInt(offset);
}
@Override
public long size() {
return writeOffset >> 2;
}
@Override
@ -88,39 +98,46 @@ abstract class AppendStoreRam<T> implements AppendStore {
}
}
static class Longs extends AppendStoreRam<long[]> implements AppendStore.Longs {
static class Longs extends AppendStoreRam implements AppendStore.Longs {
Longs() {
this(1 << 20); // 1MB
Longs(boolean direct, Storage.Params params) {
this(direct);
}
Longs(int segmentSizeBytes) {
super(segmentSizeBytes >>> 3);
Longs(boolean direct) {
this(direct, 1 << 20); // 1MB
}
@Override
protected long[] newSlab() {
return new long[slabSize];
Longs(boolean direct, int segmentSizeBytes) {
super(direct, segmentSizeBytes);
}
@Override
public void appendLong(long value) {
int offset = (int) (size & slabMask);
getSlabForWrite()[offset] = value;
size++;
int offset = (int) (this.writeOffset & slabMask);
getSlabForWrite().putLong(offset, value);
this.writeOffset += Long.BYTES;
}
@Override
public long getLong(long index) {
checkIndexInBounds(index);
int slabIdx = (int) (index >>> slabBits);
int offset = (int) (index & slabMask);
return arrays.get(slabIdx)[offset];
long byteIndex = index << 3;
if (byteIndex >= writeOffset) {
throw new IndexOutOfBoundsException("index: " + index + " size: " + size());
}
int slabIdx = (int) (byteIndex >>> slabBits);
int offset = (int) (byteIndex & slabMask);
return arrays.get(slabIdx).getLong(offset);
}
@Override
public long estimateMemoryUsageBytes() {
return arrays.size() * (estimateLongArraySize(slabSize) + POINTER_BYTES);
return arrays.size() * (estimateByteArraySize(slabSize) + POINTER_BYTES);
}
@Override
public long size() {
return writeOffset >> 3;
}
}
}

Wyświetl plik

@ -0,0 +1,353 @@
package com.onthegomap.planetiler.collection;
import static java.nio.file.StandardOpenOption.CREATE;
import static java.nio.file.StandardOpenOption.READ;
import static java.nio.file.StandardOpenOption.WRITE;
import com.carrotsearch.hppc.BitSet;
import com.onthegomap.planetiler.stats.ProcessInfo;
import com.onthegomap.planetiler.util.ByteBufferUtil;
import com.onthegomap.planetiler.util.FileUtils;
import com.onthegomap.planetiler.util.SlidingWindow;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
/**
* A map from sequential {@code long} keys to {@code long} values backed by a file on disk where the key defines the
* offset in the input file.
* <p>
* During write phase, values are stored in a sliding window of {@link ByteBuffer ByteBuffers} and flushed to disk when
* the segment slides out of the window. During read phase, they file is memory-mapped and read.
*/
class ArrayLongLongMapMmap implements LongLongMap.ParallelWrites {
/*
* In order to limit the number of in-memory segments during writes and ensure liveliness, keep track
* of the current segment index that each worker is working on in the "segments" array. Then use
* slidingWindow to make threads that try to allocate new segments wait until old segments are
* finished. Also use activeSegments semaphore to make new segments wait to allocate until
* old segments are actually flushed to disk.
*
* TODO: cleaner way to limit in-memory segments with sliding window that does not also need the semaphore?
* TODO: extract maintaining segments list into a separate utility?
*/
// 128MB per chunk
private static final int DEFAULT_SEGMENT_BITS = 27;
// work on up to 5GB of data at a time
private static final long MAX_BYTES_TO_USE = 5_000_000_000L;
private final boolean madvise;
private final int segmentBits;
private final long segmentMask;
private final long segmentBytes;
private final SlidingWindow slidingWindow;
private final Path path;
private final CopyOnWriteArrayList<AtomicInteger> segments = new CopyOnWriteArrayList<>();
private final ConcurrentHashMap<Integer, Segment> writeBuffers = new ConcurrentHashMap<>();
private final Semaphore activeSegments;
private final BitSet usedSegments = new BitSet();
FileChannel writeChannel;
private MappedByteBuffer[] segmentsArray;
private FileChannel readChannel = null;
private volatile int tail = 0;
private volatile boolean initialized = false;
ArrayLongLongMapMmap(Path path, boolean madvise) {
this(
path,
DEFAULT_SEGMENT_BITS,
guessPendingChunkLimit(1L << DEFAULT_SEGMENT_BITS),
madvise
);
}
ArrayLongLongMapMmap(Path path, int segmentBits, int maxPendingSegments, boolean madvise) {
if (segmentBits < 3) {
throw new IllegalArgumentException("Segment size must be a multiple of 8, got 2^" + segmentBits);
}
this.activeSegments = new Semaphore(maxPendingSegments);
this.madvise = madvise;
this.segmentBits = segmentBits;
segmentMask = (1L << segmentBits) - 1;
segmentBytes = 1L << segmentBits;
slidingWindow = new SlidingWindow(maxPendingSegments);
this.path = path;
try {
writeChannel = FileChannel.open(path, WRITE, CREATE);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
private static int guessPendingChunkLimit(long chunkSize) {
int minChunks = 1;
int maxChunks = (int) (MAX_BYTES_TO_USE / chunkSize);
int targetChunks = (int) (ProcessInfo.getMaxMemoryBytes() * 0.5d / chunkSize);
return Math.min(maxChunks, Math.max(minChunks, targetChunks));
}
public void init() {
try {
for (Integer oldKey : writeBuffers.keySet()) {
if (oldKey < Integer.MAX_VALUE) {
// no one else needs this segment, flush it
var toFlush = writeBuffers.remove(oldKey);
if (toFlush != null) {
toFlush.flushToDisk();
}
}
}
writeChannel.close();
readChannel = FileChannel.open(path, READ);
segmentsArray = ByteBufferUtil.mapFile(readChannel, readChannel.size(), segmentBytes, madvise, usedSegments::get);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
@Override
public Writer newWriter() {
return new Writer();
}
private void initOnce() {
if (!initialized) {
synchronized (this) {
if (!initialized) {
init();
initialized = true;
}
}
}
}
@Override
public long get(long key) {
initOnce();
long byteOffset = key << 3;
int idx = (int) (byteOffset >>> segmentBits);
if (idx >= segmentsArray.length) {
return LongLongMap.MISSING_VALUE;
}
MappedByteBuffer mappedByteBuffer = segmentsArray[idx];
if (mappedByteBuffer == null) {
return LongLongMap.MISSING_VALUE;
}
int offset = (int) (byteOffset & segmentMask);
long result = mappedByteBuffer.getLong(offset);
return result == 0 ? LongLongMap.MISSING_VALUE : result;
}
@Override
public long diskUsageBytes() {
return FileUtils.size(path);
}
@Override
public long estimateMemoryUsageBytes() {
return 0;
}
@Override
public void close() throws IOException {
if (readChannel != null) {
readChannel.close();
readChannel = null;
FileUtils.delete(path);
}
}
/**
* Instructions that tell a thread which segments must be flushed, and which must be allocated before any threads can
* start writing to the result segment.
*/
private static class SegmentActions {
private final List<Segment> flush = new ArrayList<>();
private final List<Segment> allocate = new ArrayList<>();
private Segment result = null;
private boolean done = false;
void setResult(Segment result) {
this.result = result;
}
void perform() {
if (!done) {
// if this thread is allocating a new segment, then wait on allocating it
// if this thread is just using one, then wait for it to become available
flush.forEach(Segment::flushToDisk);
allocate.forEach(Segment::allocate);
done = true;
}
}
ByteBuffer awaitBuffer() {
return result.await();
}
}
/**
* A segment of the storage file that threads can update in parallel, and can be flushed to disk when all threads are
* done writing to it.
*/
private class Segment {
private final int id;
private final long offset;
private CompletableFuture<ByteBuffer> result = new CompletableFuture<>();
private Segment(int id) {
this.offset = ((long) id) << segmentBits;
this.id = id;
}
public int id() {
return id;
}
@Override
public String toString() {
return "Segment[" + id + ']';
}
ByteBuffer await() {
try {
return result.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}
void allocate() {
slidingWindow.waitUntilInsideWindow(id);
try {
activeSegments.acquire();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
synchronized (usedSegments) {
usedSegments.set(id);
}
result.complete(ByteBuffer.allocate(1 << segmentBits));
}
void flushToDisk() {
try {
ByteBuffer buffer = result.get();
writeChannel.write(buffer, offset);
result = null;
activeSegments.release();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} catch (IOException e) {
throw new UncheckedIOException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}
}
/** Handle for a single worker thread to write values in parallel with other workers. */
private class Writer implements LongLongMap.Writer {
final AtomicInteger currentSeg = new AtomicInteger(0);
long lastSegment = -1;
long segmentOffset = -1;
ByteBuffer buffer = null;
Writer() {
segments.add(currentSeg);
}
@Override
public void close() {
SegmentActions actions = advanceTo(Integer.MAX_VALUE);
actions.perform();
}
@Override
public void put(long key, long value) {
long offset = key << 3;
long segment = offset >>> segmentBits;
// this thread is moving onto the next segment, so coordinate with other threads to allocate
// a new buffer if necessary while limiting maximum number of segments held in-memory
if (segment > lastSegment) {
if (segment >= Integer.MAX_VALUE) {
throw new IllegalArgumentException("Segment " + segment + " > Integer.MAX_VALUE");
}
SegmentActions actions = advanceTo((int) segment);
// iterate through the tail-end and free up chunks that aren't needed anymore
actions.perform();
// wait on adding a new buffer to head until the number of pending buffers is small enough
buffer = actions.awaitBuffer();
lastSegment = segment;
segmentOffset = segment << segmentBits;
}
buffer.putLong((int) (offset - segmentOffset), value);
}
private SegmentActions advanceTo(int value) {
synchronized (ArrayLongLongMapMmap.this) {
currentSeg.set(value);
SegmentActions result = new SegmentActions();
var min = segments.stream().mapToInt(AtomicInteger::get).min().orElseThrow();
if (min == Integer.MAX_VALUE) {
// all workers are done, flush everything
result.flush.addAll(writeBuffers.values());
writeBuffers.clear();
tail = min;
} else if (value == Integer.MAX_VALUE) {
// this worker is done, advance tail to min
for (Integer key : writeBuffers.keySet()) {
if (key < min) {
var segment = writeBuffers.remove(key);
if (segment != null) {
result.flush.add(segment);
}
}
}
tail = min;
} else {
// if the tail segment just finished, then advance the tail and flush all pending segments
while (tail < min) {
if (writeBuffers.containsKey(tail)) {
var segment = writeBuffers.remove(tail);
if (segment != null) {
result.flush.add(segment);
}
}
tail++;
}
Segment segment = writeBuffers.computeIfAbsent(value, id -> {
var seg = new Segment(id);
result.allocate.add(seg);
return seg;
});
result.setResult(segment);
}
// let workers waiting to allocate new segments to the head of the sliding window proceed
// NOTE: the memory hasn't been released yet, so the activeChunks semaphore will cause
// those workers to wait until the memory has been released.
slidingWindow.advanceTail(tail);
return result;
}
}
}
}

Wyświetl plik

@ -0,0 +1,103 @@
package com.onthegomap.planetiler.collection;
import com.onthegomap.planetiler.util.MemoryEstimator;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
/**
* A map from sequential {@code long} keys to {@code long} values backed by {@link ByteBuffer ByteBuffers} segments
* in-memory where key defines the segment and offset into that segment.
*/
class ArrayLongLongMapRam implements LongLongMap.ParallelWrites {
private final int segmentBits;
private final long segmentMask;
private final int segmentSize;
private final List<ByteBuffer> segments = new ArrayList<>();
private final AtomicInteger numSegments = new AtomicInteger(0);
private final boolean direct;
ArrayLongLongMapRam(boolean direct) {
this(direct, 20); // 1MB
}
ArrayLongLongMapRam(boolean direct, int segmentBits) {
this.direct = direct;
this.segmentBits = segmentBits;
segmentMask = (1L << segmentBits) - 1;
segmentSize = 1 << segmentBits;
}
private synchronized ByteBuffer getOrCreateSegment(int index) {
while (segments.size() <= index) {
segments.add(null);
}
if (segments.get(index) == null) {
numSegments.incrementAndGet();
segments.set(index, direct ? ByteBuffer.allocateDirect(segmentSize) : ByteBuffer.allocate(segmentSize));
}
return segments.get(index);
}
@Override
public Writer newWriter() {
return new Writer() {
long lastSegment = -1;
long segmentOffset = -1;
ByteBuffer buffer = null;
@Override
public void put(long key, long value) {
long offset = key << 3;
long segment = offset >>> segmentBits;
if (segment < lastSegment) {
throw new IllegalStateException("Writer encountered of order IDs at " + key);
} else if (segment > lastSegment) {
if (segment >= Integer.MAX_VALUE) {
throw new IllegalArgumentException("Segment " + segment + " >= Integer.MAX_VALUE");
}
lastSegment = segment;
segmentOffset = segment << segmentBits;
buffer = getOrCreateSegment((int) segment);
}
buffer.putLong((int) (offset - segmentOffset), value);
}
};
}
@Override
public long get(long key) {
long byteOffset = key << 3;
int idx = (int) (byteOffset >>> segmentBits);
if (idx >= segments.size()) {
return LongLongMap.MISSING_VALUE;
}
int offset = (int) (byteOffset & segmentMask);
ByteBuffer byteBuffer = segments.get(idx);
if (byteBuffer == null) {
return LongLongMap.MISSING_VALUE;
}
long result = byteBuffer.getLong(offset);
return result == 0 ? LongLongMap.MISSING_VALUE : result;
}
@Override
public long diskUsageBytes() {
return 0;
}
@Override
public long estimateMemoryUsageBytes() {
return MemoryEstimator.estimateObjectArraySize(segments.size()) +
MemoryEstimator.estimateByteArraySize(segmentSize) * numSegments.get();
}
@Override
public void close() throws IOException {}
}

Wyświetl plik

@ -1,28 +1,15 @@
package com.onthegomap.planetiler.collection;
import static com.onthegomap.planetiler.util.MemoryEstimator.estimateSize;
import com.carrotsearch.hppc.ByteArrayList;
import com.onthegomap.planetiler.util.DiskBacked;
import com.onthegomap.planetiler.util.FileUtils;
import com.onthegomap.planetiler.util.MemoryEstimator;
import com.onthegomap.planetiler.util.ResourceUsage;
import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Path;
/**
* A map that stores a single {@code long} value for each OSM node. A single thread writes the values for each node ID
* sequentially then multiple threads can read values concurrently.
* A map that stores a single {@code long} value for each OSM node.
* <p>
* Three implementations are provided: {@link #noop()} which ignores writes and throws on reads, {@link SortedTable}
* which stores node IDs and values sorted by node ID and does binary search on lookup, and {@link SparseArray} which
* only stores values and uses the node ID as the index into the array (with some compression to avoid storing many
* sequential 0's).
* <p>
* Use {@link SortedTable} for small OSM extracts and {@link SparseArray} when processing the entire planet.
* <p>
* Each implementation can be backed by either {@link AppendStoreRam} to store data in RAM or {@link AppendStoreMmap} to
* store data in a memory-mapped file.
* See {@link Type} for the available map implementations and {@link Storage} for the available storage implementations.
*/
public interface LongLongMap extends Closeable, MemoryEstimator.HasEstimate, DiskBacked {
/*
@ -37,70 +24,87 @@ public interface LongLongMap extends Closeable, MemoryEstimator.HasEstimate, Dis
/**
* Returns a new longlong map from config strings.
*
* @param name which implementation to use: {@code "noop"}, {@code "sortedtable"} or {@code "sparsearray"}
* @param storage how to store data: {@code "ram"} or {@code "mmap"}
* @param name name of the {@link Type} implementation to use
* @param storage name of the {@link Storage} implementation to use
* @param path where to store data (if mmap)
* @param madvise whether to use linux madvise random to improve read performance
* @return A longlong map instance
* @throws IllegalArgumentException if {@code name} or {@code storage} is not valid
*/
static LongLongMap from(String name, String storage, Path path, boolean madvise) {
boolean ram = isRam(storage);
return from(Type.from(name), Storage.from(storage), new Storage.Params(path, madvise));
}
return switch (name) {
case "noop" -> noop();
case "sortedtable" -> ram ? newInMemorySortedTable() : newDiskBackedSortedTable(path, madvise);
case "sparsearray" -> ram ? newInMemorySparseArray() : newDiskBackedSparseArray(path, madvise);
default -> throw new IllegalArgumentException("Unexpected value: " + name);
/**
* Returns a new longlong map.
*
* @param type The {@link Type} implementation to use
* @param storage The {@link Storage} implementation to use
* @param params Parameters to pass to storage layer
* @return A longlong map instance
*/
static LongLongMap from(Type type, Storage storage, Storage.Params params) {
return switch (type) {
case NOOP -> noop();
case SPARSE_ARRAY -> new SparseArrayLongLongMap(AppendStore.Longs.create(storage, params));
case SORTED_TABLE -> new SortedTableLongLongMap(
new AppendStore.SmallLongs(i -> AppendStore.Ints.create(storage, params.resolve("keys-" + i))),
AppendStore.Longs.create(storage, params.resolve("values"))
);
case ARRAY -> switch (storage) {
case MMAP -> new ArrayLongLongMapMmap(params.path(), params.madvise());
case RAM -> new ArrayLongLongMapRam(false);
case DIRECT -> new ArrayLongLongMapRam(true);
};
};
}
/** Estimates the number of bytes of RAM this nodemap will use for a given OSM input file. */
static long estimateMemoryUsage(String name, String storage, long osmFileSize) {
boolean ram = isRam(storage);
/** Returns a new long map using {@link Type#SORTED_TABLE} and {@link Storage#RAM}. */
static LongLongMap newInMemorySortedTable() {
return from(Type.SORTED_TABLE, Storage.RAM, new Storage.Params(Path.of("."), false));
}
/** Estimates the resource requirements for this nodemap for a given OSM input file. */
static ResourceUsage estimateStorageRequired(String name, String storage, long osmFileSize, Path path) {
return estimateStorageRequired(Type.from(name), Storage.from(storage), osmFileSize, path);
}
/** Estimates the resource requirements for this nodemap for a given OSM input file. */
static ResourceUsage estimateStorageRequired(Type type, Storage storage, long osmFileSize, Path path) {
long nodes = estimateNumNodes(osmFileSize);
long maxNodeId = estimateMaxNodeId(osmFileSize);
ResourceUsage check = new ResourceUsage("long long map");
return switch (name) {
case "noop" -> 0;
case "sortedtable" -> 300_000_000L + (ram ? 12 * nodes : 0L);
case "sparsearray" -> 300_000_000L + (ram ? 9 * nodes : 0L);
default -> throw new IllegalArgumentException("Unexpected value: " + name);
};
}
/** Estimates the number of bytes of disk this nodemap will use for a given OSM input file. */
static long estimateDiskUsage(String name, String storage, long osmFileSize) {
if (isRam(storage)) {
return 0;
} else {
long nodes = estimateNumNodes(osmFileSize);
return switch (name) {
case "noop" -> 0;
case "sortedtable" -> 12 * nodes;
case "sparsearray" -> 9 * nodes;
default -> throw new IllegalArgumentException("Unexpected value: " + name);
};
}
}
private static boolean isRam(String storage) {
return switch (storage) {
case "ram" -> true;
case "mmap" -> false;
default -> throw new IllegalArgumentException("Unexpected storage value: " + storage);
return switch (type) {
case NOOP -> check;
case SPARSE_ARRAY -> check.addMemory(300_000_000L, "sparsearray node location in-memory index")
.add(path, storage, 9 * nodes, "sparsearray node location cache");
case SORTED_TABLE -> check.addMemory(300_000_000L, "sortedtable node location in-memory index")
.add(path, storage, 12 * nodes, "sortedtable node location cache");
case ARRAY -> check.add(path, storage, 8 * maxNodeId,
"array node location cache (switch to sparsearray to reduce size)");
};
}
private static long estimateNumNodes(long osmFileSize) {
// In February 2022, planet.pbf was 62GB with 750m nodes, so scale from there
return Math.round(750_000_000d * (osmFileSize / 62_000_000_000d));
// On 2/14/2022, planet.pbf was 66691979646 bytes with ~7.5b nodes, so scale from there
return Math.round(7_500_000_000d * (osmFileSize / 66_691_979_646d));
}
private static long estimateMaxNodeId(long osmFileSize) {
// On 2/14/2022, planet.pbf was 66691979646 bytes and max node ID was ~9.5b, so scale from there
// but don't go less than 9.5b in case it's an extract
return Math.round(9_500_000_000d * Math.max(1, osmFileSize / 66_691_979_646d));
}
/** Returns a longlong map that stores no data and throws on read */
static LongLongMap noop() {
return new LongLongMap() {
return new ParallelWrites() {
@Override
public void put(long key, long value) {}
public Writer newWriter() {
return (key, value) -> {
};
}
@Override
public long get(long key) {
@ -117,48 +121,12 @@ public interface LongLongMap extends Closeable, MemoryEstimator.HasEstimate, Dis
};
}
/** Returns an in-memory longlong map that uses 12-bytes per node and binary search to find values. */
static LongLongMap newInMemorySortedTable() {
return new SortedTable(
new AppendStore.SmallLongs(i -> new AppendStoreRam.Ints()),
new AppendStoreRam.Longs()
);
}
/** Returns a memory-mapped longlong map that uses 12-bytes per node and binary search to find values. */
static LongLongMap newDiskBackedSortedTable(Path dir, boolean madvise) {
FileUtils.createDirectory(dir);
return new SortedTable(
new AppendStore.SmallLongs(i -> new AppendStoreMmap.Ints(dir.resolve("keys-" + i), madvise)),
new AppendStoreMmap.Longs(dir.resolve("values"), madvise)
);
}
/** Returns a {@link Writer} that a single thread can use to do writes into this map. */
Writer newWriter();
/**
* Returns an in-memory longlong map that uses 8-bytes per node and O(1) lookup but wastes space storing lots of 0's
* when the key space is fragmented.
*/
static LongLongMap newInMemorySparseArray() {
return new SparseArray(new AppendStoreRam.Longs());
}
/**
* Returns a memory-mapped longlong map that uses 8-bytes per node and O(1) lookup but wastes space storing lots of
* 0's when the key space is fragmented.
*/
static LongLongMap newDiskBackedSparseArray(Path path, boolean madvise) {
return new SparseArray(new AppendStoreMmap.Longs(path, madvise));
}
/**
* Writes the value for a key. Not thread safe! All writes must come from a single thread, in order by key. No writes
* can be performed after the first read.
*/
void put(long key, long value);
/**
* Returns the value for a key. Safe to be called by multiple threads after all values have been written. After the
* first read, all writes will fail.
* Returns the value for {@code key}. Safe to be called by multiple threads after all values have been written. After
* the first read, all writes will fail.
*/
long get(long key);
@ -180,172 +148,88 @@ public interface LongLongMap extends Closeable, MemoryEstimator.HasEstimate, Dis
return result;
}
/**
* A longlong map that stores keys and values sorted by key and does a binary search to lookup values.
*/
class SortedTable implements LongLongMap {
/** Which long long map implementation to use. */
enum Type {
/** Ignore writes and throw an exception on reads. */
NOOP("noop"),
/*
* It's not actually a binary search, it keeps track of the first index of each block of 256 keys, so it
* can do an O(1) lookup to narrow down the search space to 256 values.
/**
* Store an ordered list of keys, and an ordered list of values, and on read do a binary search on keys to find the
* index of the value to read.
* <p>
* Uses exactly 12 bytes per value stored so is ideal for small extracts.
* <p>
* NOTE: Requires ordered writes from a single thread.
*/
private final AppendStore.Longs offsets = new AppendStoreRam.Longs();
private final AppendStore.Longs keys;
private final AppendStore.Longs values;
private long lastChunk = -1;
private long lastKey = -1;
SORTED_TABLE("sortedtable"),
public SortedTable(AppendStore.Longs keys, AppendStore.Longs values) {
this.keys = keys;
this.values = values;
/**
* Stores values in many small arrays indexed by key, compressing large ranges from the key space.
* <p>
* Uses around ~9 bytes per value stored as the input approaches full planet size. Ideal for full-planet imports
* when you want to use as little memory as possible.
* <p>
* NOTE: Requires ordered writes from a single thread.
*/
SPARSE_ARRAY("sparsearray"),
/**
* Stores values in indexed by key, without compressing unused ranges from the key space so that writes can be done
* from multiple threads in parallel.
* <p>
* Uses exactly {@code maxNodeId * 8} bytes. Suitable only for full-planet imports that use {@link Storage#MMAP}
* storage or have plenty of extra RAM.
*/
ARRAY("array");
private final String id;
Type(String id) {
this.id = id;
}
@Override
public void put(long key, long value) {
if (key <= lastKey) {
throw new IllegalArgumentException("Nodes must be sorted ascending by ID, " + key + " came after " + lastKey);
}
lastKey = key;
long idx = keys.size();
long chunk = key >>> 8;
if (chunk != lastChunk) {
while (offsets.size() <= chunk) {
offsets.appendLong(idx);
}
lastChunk = chunk;
}
keys.appendLong(key);
values.appendLong(value);
}
@Override
public long get(long key) {
long chunk = key >>> 8;
if (chunk >= offsets.size()) {
return MISSING_VALUE;
}
// use the "offsets" index to narrow search space to <256 values
long lo = offsets.getLong(chunk);
long hi = Math.min(keys.size(), chunk >= offsets.size() - 1 ? keys.size() : offsets.getLong(chunk + 1)) - 1;
while (lo <= hi) {
long idx = (lo + hi) >>> 1;
long value = keys.getLong(idx);
if (value < key) {
lo = idx + 1;
} else if (value > key) {
hi = idx - 1;
} else {
// found
return values.getLong(idx);
/**
* Returns the type associated with {@code id} or throws {@link IllegalArgumentException} if no match is found.
*/
public static Type from(String id) {
for (Type value : values()) {
if (value.id.equalsIgnoreCase(id.trim())) {
return value;
}
}
return MISSING_VALUE;
throw new IllegalArgumentException("Unexpected long long map type: " + id);
}
@Override
public long diskUsageBytes() {
return keys.diskUsageBytes() + values.diskUsageBytes();
}
@Override
public long estimateMemoryUsageBytes() {
return keys.estimateMemoryUsageBytes() + values.estimateMemoryUsageBytes() + offsets.estimateMemoryUsageBytes();
}
@Override
public void close() throws IOException {
keys.close();
values.close();
offsets.close();
public String id() {
return id;
}
}
/**
* A longlong map that only stores values and uses the key as an index into the array, with some tweaks to avoid
* storing many sequential 0's.
*/
class SparseArray implements LongLongMap {
/** A handle for a single thread to use to insert into this map. */
interface Writer extends AutoCloseable {
// The key space is broken into chunks of 256 and for each chunk, store:
// 1) the index in the outputs array for the first key in the block
private final AppendStore.Longs offsets = new AppendStoreRam.Longs();
// 2) the number of leading 0's at the start of each block
private final ByteArrayList offsetStartPad = new ByteArrayList();
private final AppendStore.Longs values;
private int lastChunk = -1;
private int lastOffset = 0;
private long lastKey = -1;
public SparseArray(AppendStore.Longs values) {
this.values = values;
}
/**
* Writes the value for a key. Not thread safe! All calls to this method must come from a single thread, in order by
* key. No writes can be performed after the first read.
*/
void put(long key, long value);
@Override
public void put(long key, long value) {
if (key <= lastKey) {
throw new IllegalArgumentException("Nodes must be sorted ascending by ID, " + key + " came after " + lastKey);
}
lastKey = key;
long idx = values.size();
int chunk = (int) (key >>> 8);
int offset = (int) (key & 255);
default void close() {}
}
if (chunk != lastChunk) {
// new chunk, store offset and leading zeros
lastOffset = offset;
while (offsets.size() <= chunk) {
offsets.appendLong(idx);
offsetStartPad.add((byte) offset);
}
lastChunk = chunk;
} else {
// same chunk, write not_founds until we get to right idx
while (++lastOffset < offset) {
values.appendLong(MISSING_VALUE);
}
}
values.appendLong(value);
}
/** Implementations that only support sequential writes from a single thread. */
interface SequentialWrites extends LongLongMap {
void put(long key, long value);
@Override
public long get(long key) {
int chunk = (int) (key >>> 8);
int offset = (int) (key & 255);
if (chunk >= offsets.size()) {
return MISSING_VALUE;
}
long lo = offsets.getLong(chunk);
long hi = Math.min(values.size(), chunk >= offsets.size() - 1 ? values.size() : offsets.getLong(chunk + 1)) - 1;
int startPad = offsetStartPad.get(chunk) & 255;
long index = lo + offset - startPad;
if (index > hi || index < lo) {
return MISSING_VALUE;
}
return values.getLong(index);
}
@Override
public long diskUsageBytes() {
return values.diskUsageBytes();
}
@Override
public long estimateMemoryUsageBytes() {
return values.estimateMemoryUsageBytes() + estimateSize(offsets) + estimateSize(offsetStartPad);
}
@Override
public void close() throws IOException {
offsetStartPad.release();
values.close();
offsets.close();
default Writer newWriter() {
return this::put;
}
}
/** Implementations that support parallel writes from multiple threads. */
interface ParallelWrites extends LongLongMap {}
}

Wyświetl plik

@ -0,0 +1,85 @@
package com.onthegomap.planetiler.collection;
import java.io.IOException;
/**
* A longlong map that stores keys and values sorted by key and does a binary search to lookup values.
*/
public class SortedTableLongLongMap implements LongLongMap, LongLongMap.SequentialWrites {
/*
* It's not actually a binary search, it keeps track of the first index of each block of 256 keys, so it
* can do an O(1) lookup to narrow down the search space to 256 values.
*/
private final AppendStore.Longs offsets = new AppendStoreRam.Longs(false);
private final AppendStore.Longs keys;
private final AppendStore.Longs values;
private long lastChunk = -1;
private long lastKey = -1;
public SortedTableLongLongMap(AppendStore.Longs keys, AppendStore.Longs values) {
this.keys = keys;
this.values = values;
}
@Override
public void put(long key, long value) {
if (key <= lastKey) {
throw new IllegalArgumentException("Nodes must be sorted ascending by ID, " + key + " came after " + lastKey);
}
lastKey = key;
long idx = keys.size();
long chunk = key >>> 8;
if (chunk != lastChunk) {
while (offsets.size() <= chunk) {
offsets.appendLong(idx);
}
lastChunk = chunk;
}
keys.appendLong(key);
values.appendLong(value);
}
@Override
public long get(long key) {
long chunk = key >>> 8;
if (chunk >= offsets.size()) {
return MISSING_VALUE;
}
// use the "offsets" index to narrow search space to <256 values
long lo = offsets.getLong(chunk);
long hi = Math.min(keys.size(), chunk >= offsets.size() - 1 ? keys.size() : offsets.getLong(chunk + 1)) - 1;
while (lo <= hi) {
long idx = (lo + hi) >>> 1;
long value = keys.getLong(idx);
if (value < key) {
lo = idx + 1;
} else if (value > key) {
hi = idx - 1;
} else {
// found
return values.getLong(idx);
}
}
return MISSING_VALUE;
}
@Override
public long diskUsageBytes() {
return keys.diskUsageBytes() + values.diskUsageBytes();
}
@Override
public long estimateMemoryUsageBytes() {
return keys.estimateMemoryUsageBytes() + values.estimateMemoryUsageBytes() + offsets.estimateMemoryUsageBytes();
}
@Override
public void close() throws IOException {
keys.close();
values.close();
offsets.close();
}
}

Wyświetl plik

@ -0,0 +1,93 @@
package com.onthegomap.planetiler.collection;
import static com.onthegomap.planetiler.util.MemoryEstimator.estimateSize;
import com.carrotsearch.hppc.ByteArrayList;
import java.io.IOException;
/**
* A longlong map that only stores values and uses the key as an index into the array, with some tweaks to avoid storing
* many sequential 0's.
*/
public class SparseArrayLongLongMap implements LongLongMap, LongLongMap.SequentialWrites {
// The key space is broken into chunks of 256 and for each chunk, store:
// 1) the index in the outputs array for the first key in the block
private final AppendStore.Longs offsets = new AppendStoreRam.Longs(false);
// 2) the number of leading 0's at the start of each block
private final ByteArrayList offsetStartPad = new ByteArrayList();
private final AppendStore.Longs values;
private int lastChunk = -1;
private int lastOffset = 0;
private long lastKey = -1;
public SparseArrayLongLongMap(AppendStore.Longs values) {
this.values = values;
}
@Override
public void put(long key, long value) {
if (key <= lastKey) {
throw new IllegalArgumentException("Nodes must be sorted ascending by ID, " + key + " came after " + lastKey);
}
lastKey = key;
long idx = values.size();
int chunk = (int) (key >>> 8);
int offset = (int) (key & 255);
if (chunk != lastChunk) {
// new chunk, store offset and leading zeros
lastOffset = offset;
while (offsets.size() <= chunk) {
offsets.appendLong(idx);
offsetStartPad.add((byte) offset);
}
lastChunk = chunk;
} else {
// same chunk, write not_founds until we get to right idx
while (++lastOffset < offset) {
values.appendLong(MISSING_VALUE);
}
}
values.appendLong(value);
}
@Override
public long get(long key) {
int chunk = (int) (key >>> 8);
int offset = (int) (key & 255);
if (chunk >= offsets.size()) {
return MISSING_VALUE;
}
long lo = offsets.getLong(chunk);
long hi = Math.min(values.size(), chunk >= offsets.size() - 1 ? values.size() : offsets.getLong(chunk + 1)) - 1;
int startPad = offsetStartPad.get(chunk) & 255;
long index = lo + offset - startPad;
if (index > hi || index < lo) {
return MISSING_VALUE;
}
return values.getLong(index);
}
@Override
public long diskUsageBytes() {
return values.diskUsageBytes();
}
@Override
public long estimateMemoryUsageBytes() {
return values.estimateMemoryUsageBytes() + estimateSize(offsets) + estimateSize(offsetStartPad);
}
@Override
public void close() throws IOException {
offsetStartPad.release();
values.close();
offsets.close();
}
}

Wyświetl plik

@ -0,0 +1,47 @@
package com.onthegomap.planetiler.collection;
import java.nio.file.Path;
/**
* Storage method to use for {@link LongLongMap} and {@link AppendStore} implementations.
*/
public enum Storage {
/** Primitive {@code int[]} or {@code long[]} arrays stored on the JVM heap. */
RAM("ram"),
/** Memory-mapped files stored on disk. */
MMAP("mmap"),
/** Off-heap native byte buffers stored in-memory but outside the JVM heap. */
DIRECT("direct");
private final String id;
Storage(String id) {
this.id = id;
}
public String id() {
return id;
}
/**
* Returns the storage type associated with {@code id} or throws {@link IllegalArgumentException} if no match is
* found.
*/
public static Storage from(String id) {
for (Storage value : values()) {
if (value.id.equalsIgnoreCase(id.trim())) {
return value;
}
}
throw new IllegalArgumentException("Unexpected storage type: " + id);
}
/** Options for implementations that vary by {@link Storage}. */
public record Params(Path path, boolean madvise) {
/** Returns a copy of this instance, with {@code suffix} appended to {@link #path}. */
public Params resolve(String suffix) {
return new Params(path.resolve(suffix), madvise);
}
}
}

Wyświetl plik

@ -1,6 +1,9 @@
package com.onthegomap.planetiler.config;
import com.onthegomap.planetiler.collection.LongLongMap;
import com.onthegomap.planetiler.collection.Storage;
import java.time.Duration;
import java.util.stream.Stream;
/**
* Holder for common parameters used by many components in planetiler.
@ -74,8 +77,10 @@ public record PlanetilerConfig(
arguments.getInteger("sort_max_writers", "maximum number of concurrent write threads to use when sorting chunks",
6),
arguments
.getString("nodemap_type", "type of node location map: noop, sortedtable, or sparsearray", "sortedtable"),
arguments.getString("nodemap_storage", "storage for location map: mmap or ram", "mmap"),
.getString("nodemap_type", "type of node location map, one of " + Stream.of(LongLongMap.Type.values()).map(
t -> t.id()).toList(), LongLongMap.Type.SPARSE_ARRAY.id()),
arguments.getString("nodemap_storage", "storage for location map, one of " + Stream.of(Storage.values()).map(
Storage::id).toList(), Storage.MMAP.id()),
arguments.getBoolean("nodemap_madvise", "use linux madvise(random) to improve memory-mapped read performance",
false),
arguments.getString("http_user_agent", "User-Agent header to set when downloading files over HTTP",

Wyświetl plik

@ -32,6 +32,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAccumulator;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.slf4j.Logger;
@ -78,7 +79,7 @@ public class MbtilesWriter {
.toArray(LongAccumulator[]::new);
memoizedTiles = stats.longCounter("mbtiles_memoized_tiles");
featuresProcessed = stats.longCounter("mbtiles_features_processed");
Map<String, Counter.Readable> countsByZoom = new LinkedHashMap<>();
Map<String, LongSupplier> countsByZoom = new LinkedHashMap<>();
for (int zoom = config.minzoom(); zoom <= config.maxzoom(); zoom++) {
countsByZoom.put(Integer.toString(zoom), tilesByZoom[zoom]);
}

Wyświetl plik

@ -1,6 +1,7 @@
package com.onthegomap.planetiler.reader.osm;
import java.io.Closeable;
import java.util.Iterator;
import java.util.function.Consumer;
/**
@ -19,15 +20,23 @@ public interface OsmBlockSource extends Closeable {
* An individual block of raw bytes from an osm.pbf file that can be decompressed/parsed with
* {@link #decodeElements()}.
*/
interface Block {
interface Block extends Iterable<OsmElement> {
/** Create a fake block from existing elements - useful for tests. */
static Block of(Iterable<? extends OsmElement> items) {
return () -> items;
static <T extends OsmElement> Block of(Iterable<T> items) {
return () -> {
@SuppressWarnings("unchecked") Iterable<OsmElement> iterable = (Iterable<OsmElement>) items;
return iterable;
};
}
/** Decompress and parse OSM elements from this block. */
Iterable<? extends OsmElement> decodeElements();
Iterable<OsmElement> decodeElements();
@Override
default Iterator<OsmElement> iterator() {
return decodeElements().iterator();
}
default int id() {
return -1;

Wyświetl plik

@ -0,0 +1,274 @@
package com.onthegomap.planetiler.reader.osm;
import static io.prometheus.client.Collector.NANOSECONDS_PER_SECOND;
import com.onthegomap.planetiler.stats.Counter;
import com.onthegomap.planetiler.stats.Timer;
import com.onthegomap.planetiler.util.Format;
import com.onthegomap.planetiler.worker.RunnableThatThrows;
import java.util.ArrayList;
import java.util.EnumMap;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Phaser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Coordinates multiple workers processing OSM elements sequentially:
* <ul>
* <li>Ensure nodes processed first, then ways, then relations</li>
* <li>Keep a count of elements processed</li>
* <li>Lets workers wait to start an element type until all workers are finished with the previous element type
* ({@link ForWorker#arriveAndWaitForOthers(Phase)})</li>
* <li>Log when starting a new phase, including how long the previous phase took and the number of elements per
* second</li>
* </ul>
* Each worker should call {@link #forWorker()} to get a handle for that worker to coordinate with others.
*/
class OsmPhaser {
private final Format FORMAT = Format.defaultInstance();
private final Logger LOGGER = LoggerFactory.getLogger(OsmPhaser.class);
private final ConcurrentHashMap<Phase, Timer> startTimes = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Phase, Counter.MultiThreadCounter> counts = new ConcurrentHashMap<>();
private final Phaser phaser = new Phaser() {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
allWorkersFinished(Phase.from(phase));
return super.onAdvance(phase, registeredParties);
}
};
private volatile int registered = 0;
/**
* Creates a new phaser expecting a certain number of workers to register.
*/
OsmPhaser(int workers) {
phaser.bulkRegister(workers);
}
long nodes() {
return getCount(Phase.NODES);
}
long ways() {
return getCount(Phase.WAYS);
}
long relations() {
return getCount(Phase.RELATIONS);
}
private void workerStarted(Phase phase) {
startTimes.computeIfAbsent(phase, p -> Timer.start());
}
private void allWorkersFinished(Phase phase) {
var timer = startTimes.get(phase);
if (timer != null) {
timer.stop();
}
String summary = getSummary(phase);
if (summary != null) {
LOGGER.info("Finished " + getSummary(phase));
}
}
private long getCount(Phase phase) {
var counter = counts.get(phase);
return counter == null ? 0 : counter.get();
}
private String getSummary(Phase phase) {
String result = null;
var timer = startTimes.get(phase);
long count = getCount(phase);
if (timer != null && count > 0) {
double rate = count * NANOSECONDS_PER_SECOND / timer.elapsed().wall().toNanos();
result = phase + ": " + FORMAT.integer(count) + " (" + FORMAT.numeric(rate) + "/s) in " + timer;
}
return result;
}
/**
* Prints how many elements were processed per phase, how long they took, and how many elements per second were
* processed.
*/
public void printSummary() {
LOGGER.debug(" " + getSummary(Phase.NODES));
LOGGER.debug(" " + getSummary(Phase.WAYS));
LOGGER.debug(" " + getSummary(Phase.RELATIONS));
}
/**
* Indicate that {@code workers} workers will be registering eventually, so that if one starts early it doesn't
* advance through phases before it knows about the other workers.
*/
public void registerWorkers(int workers) {
phaser.bulkRegister(workers);
}
/**
* Returns a new {@link ForWorker} handle for a worker to use to coordinate with other workers.
* <p>
* If {@link #registerWorkers(int)} has not been called yet, or called with a smaller number, this will automatically
* register another worker.
*/
public ForWorker forWorker() {
return new ForWorker();
}
Phase getPhase() {
return Phase.from(phaser.getPhase());
}
public enum Phase {
/** Before processing any OSM elements */
BEGIN(0),
NODES(1),
WAYS(2),
RELATIONS(3),
/** After finished processing all OSM elements */
DONE(4);
private final int number;
Phase(int number) {
this.number = number;
}
private static Phase from(int number) {
for (var phase : values()) {
if (number == phase.number) {
return phase;
}
}
return number < 0 ? Phase.BEGIN : Phase.DONE;
}
public Phase next() {
return from(number + 1);
}
@Override
public String toString() {
return super.toString().toLowerCase(Locale.ROOT);
}
public Phase prev() {
return from(number - 1);
}
}
/** Handle for a worker to use to coordinate with other workers processing OSM elements. */
class ForWorker implements AutoCloseable {
/*
* This worker keeps track of the current phase in not-thread-safe variables, and when the phase
* changes coordinates with other threads through the Phaser.
*/
private final EnumMap<Phase, List<RunnableThatThrows>> finishActions = new EnumMap<>(Phase.class);
private Phase currentPhase = Phase.BEGIN;
private Counter counterForPhase;
// we don't increment a counter until after an element is processed, so need to keep track of if
// we skipped the current phase so that we don't increment the counter when the phase is finished
private boolean skippedPhase = true;
private ForWorker() {
synchronized (phaser) {
registered++;
if (registered > phaser.getRegisteredParties()) {
phaser.register();
}
}
}
/** Register {@code action} to run after this worker finished {@code phase}. */
public ForWorker whenWorkerFinishes(Phase phase, RunnableThatThrows action) {
finishActions.computeIfAbsent(phase, p -> new ArrayList<>()).add(action);
return this;
}
private void advance(Phase newPhase, boolean waitForOthers, boolean isSkip) {
if (currentPhase == newPhase) {
// normal case - still working on same phase
counterForPhase.inc();
} else if (newPhase.number < currentPhase.number) {
throw new IllegalStateException(
"Elements must be sorted with nodes first, then ways, then relations. Encountered " + newPhase + " after " +
currentPhase);
} else {
// advance to next phase
// but first, check if we skipped over some phases
while (currentPhase.next() != newPhase) {
advance(currentPhase.next(), true, true);
}
for (var action : finishActions.getOrDefault(currentPhase, List.of())) {
action.runAndWrapException();
}
// increment the counter for the last element processed from the last phase, unless
// the last phase was BEGIN, or we skipped over the last phase.
if (counterForPhase != null && !skippedPhase) {
counterForPhase.inc();
}
skippedPhase = isSkip;
currentPhase = currentPhase.next();
counterForPhase = counts.computeIfAbsent(currentPhase, p -> Counter.newMultiThreadCounter()).counterForThread();
// don't let a worker move to next phase if other workers aren't even on that phase yet
if (phaser.getPhase() < currentPhase.number - 1) {
waitForAllToFinish(currentPhase.prev().prev());
}
phaser.arrive();
if (waitForOthers) {
waitForAllToFinish(currentPhase.prev());
}
workerStarted(currentPhase);
// don't increment the counter since we want the count to go up after each element
// was processed (not before).
}
}
private void waitForAllToFinish(Phase phase) {
try {
phaser.awaitAdvanceInterruptibly(phase.number);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
/**
* Start processing an element in {@code phase} without waiting for other workers to start on the phase.
* <p>
* If this is the first element in that phase, then the previous phase is marked as done which triggers any handler
* registered through {@link #whenWorkerFinishes(Phase, RunnableThatThrows)}. If this was the last worker on the
* previous phase, then the overall phase will advance into {@code phase} now.
* <p>
* This could block if we need to wait for other workers to get to the previous phase before we leave it.
*
* @throws RuntimeException if the thread is interrupted while waiting
*/
public void arrive(Phase phase) {
advance(phase, false, false);
}
/**
* Wait for all workers to get to this point before start to processing the first element in {@code phase}.
*
* @throws RuntimeException if the thread is interrupted while waiting
*/
public void arriveAndWaitForOthers(Phase phase) {
advance(phase, true, false);
}
@Override
public void close() {
advance(Phase.DONE, false, false);
}
}
}

Wyświetl plik

@ -35,7 +35,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Supplier;
@ -70,9 +70,6 @@ public class OsmReader implements Closeable, MemoryEstimator.HasEstimate {
private final Stats stats;
private final LongLongMap nodeLocationDb;
private final Counter.Readable PASS1_BLOCKS = Counter.newSingleThreadCounter();
private final Counter.Readable PASS1_NODES = Counter.newSingleThreadCounter();
private final Counter.Readable PASS1_WAYS = Counter.newSingleThreadCounter();
private final Counter.Readable PASS1_RELATIONS = Counter.newSingleThreadCounter();
private final Profile profile;
private final String name;
private final AtomicLong relationInfoSizes = new AtomicLong(0);
@ -83,18 +80,22 @@ public class OsmReader implements Closeable, MemoryEstimator.HasEstimate {
private LongObjectHashMap<OsmRelationInfo> relationInfo = Hppc.newLongObjectHashMap();
// ~800mb, ~1.6GB when sorting
private LongLongMultimap wayToRelations = LongLongMultimap.newSparseUnorderedMultimap();
private final Object wayToRelationsLock = new Object();
// for multipolygons need to store way info (20m ways, 800m nodes) to use when processing relations (4.5m)
// ~300mb
private LongHashSet waysInMultipolygon = new LongHashSet();
private final Object waysInMultipolygonLock = new Object();
// ~7GB
private LongLongMultimap multipolygonWayGeometries = LongLongMultimap.newDensedOrderedMultimap();
// keep track of data needed to encode/decode role strings into a long
private final ObjectIntHashMap<String> roleIds = new ObjectIntHashMap<>();
private final IntObjectHashMap<String> roleIdsReverse = new IntObjectHashMap<>();
private final AtomicLong roleSizes = new AtomicLong(0);
private final OsmPhaser pass1Phaser = new OsmPhaser(0);
/**
* Constructs a new {@code OsmReader} from
* Constructs a new {@code OsmReader} from an {@code osmSourceProvider} that will use {@code nodeLocationDb} as a
* temporary store for node locations.
*
* @param name ID for this reader to use in stats and logs
* @param osmSourceProvider the file to read raw nodes, ways, and relations from
@ -113,9 +114,9 @@ public class OsmReader implements Closeable, MemoryEstimator.HasEstimate {
stats.monitorInMemoryObject("osm_relations", this);
stats.counter("osm_pass1_elements_processed", "type", () -> Map.of(
"blocks", PASS1_BLOCKS,
"nodes", PASS1_NODES,
"ways", PASS1_WAYS,
"relations", PASS1_RELATIONS
"nodes", pass1Phaser::nodes,
"ways", pass1Phaser::ways,
"relations", pass1Phaser::relations
));
}
@ -130,141 +131,150 @@ public class OsmReader implements Closeable, MemoryEstimator.HasEstimate {
* @param config user-provided arguments to control the number of threads, and log interval
*/
public void pass1(PlanetilerConfig config) {
record BlockWithResult(OsmBlockSource.Block block, WeightedHandoffQueue<OsmElement> result) {}
var timer = stats.startStage("osm_pass1");
int parseThreads = Math.max(1, config.threads() - 2);
int pendingBlocks = parseThreads * 2;
// Each worker will hand off finished elements to the single process thread. A Future<List<OsmElement>> would result
// in too much memory usage/GC so use a WeightedHandoffQueue instead which will fill up with lightweight objects
// like nodes without any tags, but limit the number of pending heavy entities like relations
int handoffQueueBatches = Math.max(
10,
(int) (100d * ProcessInfo.getMaxMemoryBytes() / 100_000_000_000d)
);
var parsedBatches = new WorkQueue<WeightedHandoffQueue<OsmElement>>("elements", pendingBlocks, 1, stats);
var pipeline = WorkerPipeline.start("osm_pass1", stats);
var readBranch = pipeline
.<BlockWithResult>fromGenerator("read", next -> {
osmBlockSource.forEachBlock((block) -> {
WeightedHandoffQueue<OsmElement> result = new WeightedHandoffQueue<>(handoffQueueBatches, 10_000);
parsedBatches.accept(result);
next.accept(new BlockWithResult(block, result));
});
parsedBatches.close();
})
.addBuffer("pbf_blocks", pendingBlocks)
.sinkToConsumer("parse", parseThreads, block -> {
List<OsmElement> result = new ArrayList<>();
boolean nodesDone = false, waysDone = false;
for (var element : block.block.decodeElements()) {
if (element instanceof OsmElement.Node node) {
// pre-compute encoded location in worker threads since it is fairly expensive and should be done in parallel
node.encodedLocation();
if (nodesDone) {
throw new IllegalArgumentException(
"Input file must be sorted with nodes first, then ways, then relations. Encountered node " + node.id() +
" after a way or relation");
}
} else if (element instanceof OsmElement.Way way) {
nodesDone = true;
if (waysDone) {
throw new IllegalArgumentException(
"Input file must be sorted with nodes first, then ways, then relations. Encountered way " + way.id() +
" after a relation");
}
} else if (element instanceof OsmElement.Relation) {
nodesDone = waysDone = true;
}
block.result.accept(element, element.cost());
}
block.result.close();
});
var processBranch = pipeline
.readFromQueue(parsedBatches)
.sinkToConsumer("process", 1, this::processPass1Block);
CompletableFuture<?> done;
var loggers = ProgressLoggers.create()
.addRateCounter("nodes", PASS1_NODES, true)
.addRateCounter("nodes", pass1Phaser::nodes, true)
.addFileSizeAndRam(nodeLocationDb)
.addRateCounter("ways", PASS1_WAYS, true)
.addRateCounter("rels", PASS1_RELATIONS, true)
.addRateCounter("ways", pass1Phaser::ways, true)
.addRateCounter("rels", pass1Phaser::relations, true)
.addRateCounter("blocks", PASS1_BLOCKS)
.newLine()
.addProcessStats()
.addInMemoryObject("hppc", this)
.newLine()
.addPipelineStats(readBranch)
.addPipelineStats(processBranch);
.newLine();
loggers.awaitAndLog(joinFutures(readBranch.done(), processBranch.done()), config.logInterval());
if (nodeLocationDb instanceof LongLongMap.ParallelWrites) {
// If the node location writer supports parallel writes, then parse, process, and write node locations from worker threads
int parseThreads = Math.max(1, config.threads() - 1);
pass1Phaser.registerWorkers(parseThreads);
var parallelPipeline = pipeline
.fromGenerator("read", osmBlockSource::forEachBlock)
.addBuffer("pbf_blocks", parseThreads * 2)
.sinkTo("process", parseThreads, this::processPass1Blocks);
loggers.addPipelineStats(parallelPipeline);
done = parallelPipeline.done();
} else {
// If the node location writer requires sequential writes, then the reader hands off the block to workers
// and a handle that the result will go on to the single-threaded writer, and the writer emits new nodes when
// they are ready
int parseThreads = Math.max(1, config.threads() - 2);
int pendingBlocks = parseThreads * 2;
// Each worker will hand off finished elements to the single process thread. A Future<List<OsmElement>> would result
// in too much memory usage/GC so use a WeightedHandoffQueue instead which will fill up with lightweight objects
// like nodes without any tags, but limit the number of pending heavy entities like relations
int handoffQueueBatches = Math.max(
10,
(int) (100d * ProcessInfo.getMaxMemoryBytes() / 20_000_000_000d)
);
record BlockWithResult(OsmBlockSource.Block block, WeightedHandoffQueue<OsmElement> result) {}
pass1Phaser.registerWorkers(1);
var parsedBatches = new WorkQueue<WeightedHandoffQueue<OsmElement>>("elements", pendingBlocks, 1, stats);
var readBranch = pipeline
.<BlockWithResult>fromGenerator("read", next -> {
osmBlockSource.forEachBlock((block) -> {
WeightedHandoffQueue<OsmElement> result = new WeightedHandoffQueue<>(handoffQueueBatches, 10_000);
parsedBatches.accept(result);
next.accept(new BlockWithResult(block, result));
});
parsedBatches.close();
})
.addBuffer("pbf_blocks", pendingBlocks)
.sinkToConsumer("parse", parseThreads, block -> {
for (var element : block.block.decodeElements()) {
if (element instanceof OsmElement.Node node) {
// pre-compute encoded location in worker threads since it is fairly expensive and should be done in parallel
node.encodedLocation();
}
block.result.accept(element, element.cost());
}
block.result.close();
});
LOGGER.debug("processed " +
"blocks:" + FORMAT.integer(PASS1_BLOCKS.get()) +
" nodes:" + FORMAT.integer(PASS1_NODES.get()) +
" ways:" + FORMAT.integer(PASS1_WAYS.get()) +
" relations:" + FORMAT.integer(PASS1_RELATIONS.get()));
var processBranch = pipeline
.readFromQueue(parsedBatches)
.sinkTo("process", 1, this::processPass1Blocks);
loggers
.addPipelineStats(readBranch)
.addPipelineStats(processBranch);
done = joinFutures(readBranch.done(), processBranch.done());
}
loggers.awaitAndLog(done, config.logInterval());
LOGGER.debug("Processed " + FORMAT.integer(PASS1_BLOCKS.get()) + " blocks:");
pass1Phaser.printSummary();
timer.stop();
}
void processPass1Block(Iterable<? extends OsmElement> elements) {
int nodes = 0, ways = 0, relations = 0;
for (OsmElement element : elements) {
// only a single thread calls this with elements ordered by ID, so it's safe to manipulate these
// shared data structures which are not thread safe
if (element.id() < 0) {
throw new IllegalArgumentException("Negative OSM element IDs not supported: " + element);
}
if (element instanceof OsmElement.Node node) {
nodes++;
try {
profile.preprocessOsmNode(node);
} catch (Exception e) {
LOGGER.error("Error preprocessing OSM node " + node.id(), e);
}
// TODO allow limiting node storage to only ones that profile cares about
nodeLocationDb.put(node.id(), node.encodedLocation());
} else if (element instanceof OsmElement.Way way) {
ways++;
try {
profile.preprocessOsmWay(way);
} catch (Exception e) {
LOGGER.error("Error preprocessing OSM way " + way.id(), e);
}
} else if (element instanceof OsmElement.Relation relation) {
relations++;
try {
List<OsmRelationInfo> infos = profile.preprocessOsmRelation(relation);
if (infos != null) {
for (OsmRelationInfo info : infos) {
relationInfo.put(relation.id(), info);
relationInfoSizes.addAndGet(info.estimateMemoryUsageBytes());
for (var member : relation.members()) {
var type = member.type();
// TODO handle nodes in relations and super-relations
if (type == OsmElement.Type.WAY) {
wayToRelations.put(member.ref(), encodeRelationMembership(member.role(), relation.id()));
void processPass1Blocks(Iterable<? extends Iterable<? extends OsmElement>> blocks) {
// may be called by multiple threads so need to synchronize access to any shared data structures
try (
var nodeWriter = nodeLocationDb.newWriter();
var phases = pass1Phaser.forWorker()
.whenWorkerFinishes(OsmPhaser.Phase.NODES, nodeWriter::close)
) {
for (var block : blocks) {
for (OsmElement element : block) {
if (element.id() < 0) {
throw new IllegalArgumentException("Negative OSM element IDs not supported: " + element);
}
if (element instanceof OsmElement.Node node) {
phases.arrive(OsmPhaser.Phase.NODES);
try {
profile.preprocessOsmNode(node);
} catch (Exception e) {
LOGGER.error("Error preprocessing OSM node " + node.id(), e);
}
// TODO allow limiting node storage to only ones that profile cares about
nodeWriter.put(node.id(), node.encodedLocation());
} else if (element instanceof OsmElement.Way way) {
phases.arriveAndWaitForOthers(OsmPhaser.Phase.WAYS);
try {
profile.preprocessOsmWay(way);
} catch (Exception e) {
LOGGER.error("Error preprocessing OSM way " + way.id(), e);
}
} else if (element instanceof OsmElement.Relation relation) {
phases.arrive(OsmPhaser.Phase.RELATIONS);
try {
List<OsmRelationInfo> infos = profile.preprocessOsmRelation(relation);
if (infos != null) {
synchronized (wayToRelationsLock) {
for (OsmRelationInfo info : infos) {
relationInfo.put(relation.id(), info);
relationInfoSizes.addAndGet(info.estimateMemoryUsageBytes());
for (var member : relation.members()) {
var type = member.type();
// TODO handle nodes in relations and super-relations
if (type == OsmElement.Type.WAY) {
wayToRelations.put(member.ref(), encodeRelationMembership(member.role(), relation.id()));
}
}
}
}
}
} catch (Exception e) {
LOGGER.error("Error preprocessing OSM relation " + relation.id(), e);
}
// TODO allow limiting multipolygon storage to only ones that profile cares about
if (isMultipolygon(relation)) {
synchronized (waysInMultipolygonLock) {
for (var member : relation.members()) {
if (member.type() == OsmElement.Type.WAY) {
waysInMultipolygon.add(member.ref());
}
}
}
}
}
} catch (Exception e) {
LOGGER.error("Error preprocessing OSM relation " + relation.id(), e);
}
// TODO allow limiting multipolygon storage to only ones that profile cares about
if (isMultipolygon(relation)) {
for (var member : relation.members()) {
if (member.type() == OsmElement.Type.WAY) {
waysInMultipolygon.add(member.ref());
}
}
}
PASS1_BLOCKS.inc();
}
}
PASS1_BLOCKS.inc();
PASS1_NODES.incBy(nodes);
PASS1_WAYS.incBy(ways);
PASS1_RELATIONS.incBy(relations);
}
private static boolean isMultipolygon(OsmElement.Relation relation) {
@ -282,19 +292,16 @@ public class OsmReader implements Closeable, MemoryEstimator.HasEstimate {
int threads = config.threads();
int processThreads = Math.max(threads < 4 ? threads : (threads - 1), 1);
Counter.MultiThreadCounter blocksProcessed = Counter.newMultiThreadCounter();
Counter.MultiThreadCounter nodesProcessed = Counter.newMultiThreadCounter();
Counter.MultiThreadCounter waysProcessed = Counter.newMultiThreadCounter();
Counter.MultiThreadCounter relsProcessed = Counter.newMultiThreadCounter();
// track relation count separately because they get enqueued onto the distributor near the end
Counter.MultiThreadCounter relationsProcessed = Counter.newMultiThreadCounter();
OsmPhaser pass2Phaser = new OsmPhaser(processThreads);
stats.counter("osm_pass2_elements_processed", "type", () -> Map.of(
"blocks", blocksProcessed,
"nodes", nodesProcessed,
"ways", waysProcessed,
"relations", relsProcessed
"blocks", blocksProcessed::get,
"nodes", pass2Phaser::nodes,
"ways", pass2Phaser::ways,
"relations", relationsProcessed
));
// since multiple threads process OSM elements, and we must process all ways before processing any relations,
// use a count down latch to wait for all threads to finish processing ways
CountDownLatch waitForWays = new CountDownLatch(processThreads);
// Use a Distributor to keep all worker threads busy when processing the final blocks of relations by offloading
// items to threads that are done reading blocks
Distributor<OsmElement.Relation> relationDistributor = Distributor.createWithCapacity(1_000);
@ -305,11 +312,9 @@ public class OsmReader implements Closeable, MemoryEstimator.HasEstimate {
.<SortableFeature>addWorker("process", processThreads, (prev, next) -> {
// avoid contention trying to get the thread-local counters by getting them once when thread starts
Counter blocks = blocksProcessed.counterForThread();
Counter nodes = nodesProcessed.counterForThread();
Counter ways = waysProcessed.counterForThread();
Counter rels = relsProcessed.counterForThread();
Counter rels = relationsProcessed.counterForThread();
var waysDone = false;
var phaser = pass2Phaser.forWorker();
var featureCollectors = new FeatureCollector.Factory(config, stats);
final NodeLocationProvider nodeLocations = newNodeLocationProvider();
FeatureRenderer renderer = createFeatureRenderer(writer, config, next);
@ -322,22 +327,16 @@ public class OsmReader implements Closeable, MemoryEstimator.HasEstimate {
});
for (var block : prev) {
int blockNodes = 0, blockWays = 0;
for (var element : block.decodeElements()) {
SourceFeature feature = null;
if (element instanceof OsmElement.Node node) {
blockNodes++;
phaser.arrive(OsmPhaser.Phase.NODES);
feature = processNodePass2(node);
} else if (element instanceof OsmElement.Way way) {
blockWays++;
phaser.arrive(OsmPhaser.Phase.WAYS);
feature = processWayPass2(way, nodeLocations);
} else if (element instanceof OsmElement.Relation relation) {
// ensure all ways finished processing before we start relations
if (!waysDone && waitForWays.getCount() > 0) {
waitForWays.countDown();
waitForWays.await();
waysDone = true;
}
phaser.arriveAndWaitForOthers(OsmPhaser.Phase.RELATIONS);
relationHandler.accept(relation);
}
// render features specified by profile and hand them off to next step that will
@ -346,13 +345,10 @@ public class OsmReader implements Closeable, MemoryEstimator.HasEstimate {
render(featureCollectors, renderer, element, feature);
}
}
blocks.inc();
nodes.incBy(blockNodes);
ways.incBy(blockWays);
}
// just in case a worker skipped over all relations
waitForWays.countDown();
phaser.close();
// do work for other threads that are still processing blocks of relations
relationHandler.close();
@ -361,10 +357,10 @@ public class OsmReader implements Closeable, MemoryEstimator.HasEstimate {
.sinkToConsumer("write", 1, writer);
var logger = ProgressLoggers.create()
.addRatePercentCounter("nodes", PASS1_NODES.get(), nodesProcessed, true)
.addRatePercentCounter("nodes", pass1Phaser.nodes(), pass2Phaser::nodes, true)
.addFileSizeAndRam(nodeLocationDb)
.addRatePercentCounter("ways", PASS1_WAYS.get(), waysProcessed, true)
.addRatePercentCounter("rels", PASS1_RELATIONS.get(), relsProcessed, true)
.addRatePercentCounter("ways", pass1Phaser.ways(), pass2Phaser::ways, true)
.addRatePercentCounter("rels", pass1Phaser.relations(), relationsProcessed, true)
.addRateCounter("features", writer::numFeaturesWritten)
.addFileSize(writer)
.addRatePercentCounter("blocks", PASS1_BLOCKS.get(), blocksProcessed, false)
@ -376,11 +372,8 @@ public class OsmReader implements Closeable, MemoryEstimator.HasEstimate {
pipeline.awaitAndLog(logger, config.logInterval());
LOGGER.debug("processed " +
"blocks:" + FORMAT.integer(blocksProcessed.get()) +
" nodes:" + FORMAT.integer(nodesProcessed.get()) +
" ways:" + FORMAT.integer(waysProcessed.get()) +
" relations:" + FORMAT.integer(relsProcessed.get()));
LOGGER.debug("Processed " + FORMAT.integer(blocksProcessed.get()) + " blocks:");
pass2Phaser.printSummary();
timer.stop();

Wyświetl plik

@ -1,5 +1,6 @@
package com.onthegomap.planetiler.stats;
import com.onthegomap.planetiler.util.Parse;
import com.sun.management.GarbageCollectionNotificationInfo;
import com.sun.management.GcInfo;
import java.lang.management.BufferPoolMXBean;
@ -74,16 +75,6 @@ public class ProcessInfo {
.map(Duration::ofNanos);
}
/**
* Returns the amount direct (off-heap) memory used by the JVM.
*/
public static OptionalLong getDirectMemoryUsage() {
return ManagementFactory.getPlatformMXBeans(BufferPoolMXBean.class).stream()
.filter(bufferPool -> "direct".equals(bufferPool.getName()))
.mapToLong(BufferPoolMXBean::getMemoryUsed)
.findFirst();
}
// reflection helper
private static <T> T callGetter(Method method, Object obj, Class<T> resultClazz) throws InvocationTargetException {
try {
@ -117,8 +108,55 @@ public class ProcessInfo {
}
/** Returns the JVM used memory. */
public static long getUsedMemoryBytes() {
return Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
public static long getOnHeapUsedMemoryBytes() {
var runtime = Runtime.getRuntime();
return runtime.totalMemory() - runtime.freeMemory();
}
/**
* Returns the amount of direct memory (allocated through {@link java.nio.ByteBuffer#allocateDirect(int)}) used by the
* JVM.
*/
public static long getDirectUsedMemoryBytes() {
return ManagementFactory.getPlatformMXBeans(BufferPoolMXBean.class).stream()
.filter(pool -> "direct".equals(pool.getName()))
.mapToLong(BufferPoolMXBean::getTotalCapacity)
.sum();
}
/**
* Returns an estimate the amount of direct memory limit for this JVM by parsing {@code -XX:MaxDirectMemorySize}
* argument.
*/
public static long getDirectUsedMemoryLimit() {
return ManagementFactory.getRuntimeMXBean().getInputArguments().stream()
.filter(arg -> arg.startsWith("-XX:MaxDirectMemorySize="))
.mapToLong(arg -> Parse.jvmMemoryStringToBytes(arg.replace("-XX:MaxDirectMemorySize=", "")))
.findFirst()
// if -XX:MaxDirectMemorySize not explicitly specified, then direct limit is equal to -Xmx so total memory
// used can be 2x that.
.orElseGet(ProcessInfo::getMaxMemoryBytes);
}
/**
* Returns the total amount of memory available on the system if available.
*/
public static OptionalLong getSystemMemoryBytes() {
if (ManagementFactory.getOperatingSystemMXBean()instanceof com.sun.management.OperatingSystemMXBean osBean) {
return OptionalLong.of(osBean.getTotalMemorySize());
} else {
return OptionalLong.empty();
}
}
/**
* Returns the amount of free memory on this system outside the JVM heap, if available.
*/
public static OptionalLong getSystemFreeMemoryBytes() {
return getSystemMemoryBytes().stream()
.map(value -> value - getMaxMemoryBytes())
.filter(value -> value > 0)
.findFirst();
}
/** Processor usage statistics for a thread. */

Wyświetl plik

@ -18,6 +18,7 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@ -250,10 +251,10 @@ public class ProgressLoggers {
String formatted = format.percent(num);
return num > 0.6 ? red(formatted) : num > 0.3 ? yellow(formatted) : formatted;
});
loggers.add(new ProgressLogger("mem",
() -> format.storage(ProcessInfo.getUsedMemoryBytes(), false) + "/" +
loggers.add(new ProgressLogger("heap",
() -> format.storage(ProcessInfo.getOnHeapUsedMemoryBytes(), false) + "/" +
format.storage(ProcessInfo.getMaxMemoryBytes(), false) +
ProcessInfo.getDirectMemoryUsage().stream()
OptionalLong.of(ProcessInfo.getDirectUsedMemoryBytes()).stream()
.filter(usage -> usage > 0)
.mapToObj(mem -> " direct: " + format.storage(mem))
.findFirst()

Wyświetl plik

@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -187,7 +188,7 @@ class PrometheusStats implements Stats {
}
@Override
public void counter(String name, String label, Supplier<Map<String, Counter.Readable>> values) {
public void counter(String name, String label, Supplier<Map<String, LongSupplier>> values) {
new Collector() {
@Override
public List<MetricFamilySamples> collect() {
@ -195,7 +196,7 @@ class PrometheusStats implements Stats {
CounterMetricFamily family = new CounterMetricFamily(BASE + sanitizeMetricName(name), "", List.of(label));
result.add(family);
for (var entry : values.get().entrySet()) {
family.addMetric(List.of(entry.getKey()), entry.getValue().get());
family.addMetric(List.of(entry.getKey()), entry.getValue().getAsLong());
}
return result;
}

Wyświetl plik

@ -10,6 +10,7 @@ import java.nio.file.Path;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -129,7 +130,7 @@ public interface Stats extends AutoCloseable {
* Tracks a group of counters with a {@code label} key that is set to the key in each entry of the map returned from
* {@code values}.
*/
void counter(String name, String label, Supplier<Map<String, Counter.Readable>> values);
void counter(String name, String label, Supplier<Map<String, LongSupplier>> values);
/**
* Records that an invalid input feature was discarded where {@code errorCode} can be used to identify the kind of
@ -180,7 +181,7 @@ public interface Stats extends AutoCloseable {
}
@Override
public void counter(String name, String label, Supplier<Map<String, Counter.Readable>> values) {}
public void counter(String name, String label, Supplier<Map<String, LongSupplier>> values) {}
@Override
public void processedElement(String elemType, String layer) {}

Wyświetl plik

@ -0,0 +1,143 @@
package com.onthegomap.planetiler.util;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.function.IntPredicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Utilities for working with memory-mapped and direct byte buffers.
*/
public class ByteBufferUtil {
private static final Logger LOGGER = LoggerFactory.getLogger(ByteBufferUtil.class);
/** Attempts to invoke native utility and logs an error message if not available. */
public static void init() {
if (Madvise.pageSize < 0) {
try {
posixMadvise(ByteBuffer.allocateDirect(1), Madvice.RANDOM);
} catch (IOException e) {
LOGGER.info("madvise not available on this system");
}
}
}
/**
* Give a hint to the system how a mapped memory segment will be used so the OS can optimize performance.
*
* @param buffer The mapped memory segment.
* @param value The advice to use.
* @throws IOException If an error occurs or madvise not available on this system
* @see <a href="https://man7.org/linux/man-pages/man3/posix_madvise.3.html">posix_madvise(3) Linux manual page</a>
*/
public static void posixMadvise(ByteBuffer buffer, Madvice value) throws IOException {
Madvise.posixMadvise(buffer, value.value);
}
/**
* Attempt to force-unmap a list of memory-mapped file segments so it can safely be deleted.
* <p>
* Can also be used to force-deallocate a direct byte buffer.
*
* @param segments The segments to free
* @throws IOException If any error occurs freeing the segment
*/
public static void free(ByteBuffer... segments) throws IOException {
try {
// attempt to force-unmap the file, so we can delete it later
// https://stackoverflow.com/questions/2972986/how-to-unmap-a-file-from-memory-mapped-using-filechannel-in-java
Class<?> unsafeClass;
try {
unsafeClass = Class.forName("sun.misc.Unsafe");
} catch (Exception ex) {
unsafeClass = Class.forName("jdk.internal.misc.Unsafe");
}
Method clean = unsafeClass.getMethod("invokeCleaner", ByteBuffer.class);
clean.setAccessible(true);
Field theUnsafeField = unsafeClass.getDeclaredField("theUnsafe");
theUnsafeField.setAccessible(true);
Object theUnsafe = theUnsafeField.get(null);
for (ByteBuffer buffer : segments) {
if (buffer != null && (buffer instanceof MappedByteBuffer || buffer.isDirect())) {
clean.invoke(theUnsafe, buffer);
}
}
} catch (Exception e) {
throw new IOException("Unable to unmap", e);
}
}
/**
* Same as {@link #mapFile(FileChannel, long, long, boolean, IntPredicate)} except map every segment without testing
* that it has data first.
*/
public static MappedByteBuffer[] mapFile(FileChannel readChannel, long expectedLength, long segmentBytes,
boolean madvise) throws IOException {
return mapFile(readChannel, expectedLength, segmentBytes, madvise, i -> true);
}
/**
* Memory-map many segments of a file.
*
* @param file A channel for the input file being read
* @param expectedLength Expected number of bytes in the file
* @param segmentBytes Number of bytes in each segment
* @param madvise {@code true} to use linux madvise random on the file to improve read performance
* @param segmentHasData Predicate that returns {@code false} when a segment index has no data
* @return The array of mapped segments.
* @throws IOException If an error occurs reading from the file
*/
public static MappedByteBuffer[] mapFile(FileChannel file, long expectedLength, long segmentBytes,
boolean madvise, IntPredicate segmentHasData) throws IOException {
assert expectedLength == file.size();
int segmentCount = (int) (expectedLength / segmentBytes);
if (expectedLength % segmentBytes != 0) {
segmentCount++;
}
MappedByteBuffer[] segmentsArray = new MappedByteBuffer[segmentCount];
int i = 0;
boolean madviseFailed = false;
for (long segmentStart = 0; segmentStart < expectedLength; segmentStart += segmentBytes) {
long segmentLength = Math.min(segmentBytes, expectedLength - segmentStart);
if (segmentHasData.test(i)) {
MappedByteBuffer buffer = file.map(FileChannel.MapMode.READ_ONLY, segmentStart, segmentLength);
if (madvise) {
try {
ByteBufferUtil.posixMadvise(buffer, ByteBufferUtil.Madvice.RANDOM);
} catch (IOException e) {
if (!madviseFailed) { // log once
LOGGER.info(
"madvise not available on this system - node location lookup may be slower when less free RAM is available outside the JVM");
madviseFailed = true;
}
}
}
segmentsArray[i] = buffer;
}
i++;
}
return segmentsArray;
}
/** Values from https://man7.org/linux/man-pages/man2/madvise.2.html */
public enum Madvice {
NORMAL(0),
RANDOM(1),
SEQUENTIAL(2),
WILLNEED(3),
DONTNEED(4);
final int value;
Madvice(int value) {
this.value = value;
}
}
}

Wyświetl plik

@ -21,15 +21,12 @@ import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;
import java.nio.file.FileStore;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -46,7 +43,7 @@ import org.slf4j.LoggerFactory;
* changes.
* <p>
* For example:
*
*
* <pre>
* {@code
* Downloader.create(PlanetilerConfig.defaults())
@ -77,7 +74,7 @@ public class Downloader {
private final ExecutorService executor;
private final Stats stats;
private final long chunkSizeBytes;
private final ConcurrentMap<FileStore, Long> bytesToDownload = new ConcurrentHashMap<>();
private final ResourceUsage diskSpaceCheck = new ResourceUsage("download");
Downloader(PlanetilerConfig config, Stats stats, long chunkSizeBytes) {
this.chunkSizeBytes = chunkSizeBytes;
@ -209,7 +206,8 @@ public class Downloader {
Path tmpPath = resourceToDownload.tmpPath();
FileUtils.delete(tmpPath);
FileUtils.deleteOnExit(tmpPath);
checkDiskSpace(tmpPath, metadata.size);
diskSpaceCheck.addDisk(tmpPath, metadata.size, resourceToDownload.id);
diskSpaceCheck.checkAgainstLimits(config.force(), false);
return httpDownload(resourceToDownload, tmpPath)
.thenCompose(result -> {
try {
@ -231,27 +229,6 @@ public class Downloader {
}, executor);
}
private void checkDiskSpace(Path destination, long size) {
try {
var fs = Files.getFileStore(destination.toAbsolutePath().getParent());
var totalPendingBytes = bytesToDownload.merge(fs, size, Long::sum);
var availableBytes = fs.getUnallocatedSpace();
if (totalPendingBytes > availableBytes) {
var format = Format.defaultInstance();
String warning =
"Attempting to download " + format.storage(totalPendingBytes) + " to " + fs + " which only has " +
format.storage(availableBytes) + " available";
if (config.force()) {
LOGGER.warn(warning + ", will probably fail.");
} else {
throw new IllegalArgumentException(warning + ", use the --force argument to continue anyway.");
}
}
} catch (IOException e) {
LOGGER.warn("Unable to check file size for download, you may run out of space: " + e, e);
}
}
private CompletableFuture<ResourceMetadata> httpHeadFollowRedirects(String url, int redirects) {
if (redirects > MAX_REDIRECTS) {
throw new IllegalStateException("Exceeded " + redirects + " redirects for " + url);

Wyświetl plik

@ -2,6 +2,7 @@ package com.onthegomap.planetiler.util;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.FileStore;
import java.nio.file.FileSystem;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
@ -96,6 +97,21 @@ public class FileUtils {
}
}
/** Returns the {@link FileStore} for {@code path}, or its nearest parent directory if it does not exist yet. */
public static FileStore getFileStore(final Path path) {
Path absolutePath = path.toAbsolutePath();
IOException exception = null;
while (absolutePath != null) {
try {
return Files.getFileStore(absolutePath);
} catch (IOException e) {
exception = e;
absolutePath = absolutePath.getParent();
}
}
throw new UncheckedIOException("Cannot get file store for " + path, exception);
}
/**
* Moves a file.
*
@ -129,11 +145,11 @@ public class FileUtils {
*/
public static void createParentDirectories(Path path) {
try {
if (Files.isDirectory(path)) {
if (Files.isDirectory(path) && !Files.exists(path)) {
Files.createDirectories(path);
} else {
Path parent = path.getParent();
if (parent != null) {
if (parent != null && !Files.exists(parent)) {
Files.createDirectories(parent);
}
}

Wyświetl plik

@ -84,6 +84,11 @@ public class Format {
return storage(num, false);
}
/** Alias for {@link #numeric(Number, boolean)} where {@code pad=false}. */
public String numeric(Number num) {
return numeric(num, false);
}
/** Returns a number formatted like "123" "1.2k" "2.5B", etc. */
public String numeric(Number num, boolean pad) {
return format(num, pad, NUMERIC_SUFFIXES);

Wyświetl plik

@ -31,7 +31,7 @@ import jnr.ffi.types.size_t;
/**
* Wrapper for native madvise function to be used via the public API
* {@link MmapUtil#madvise(ByteBuffer, MmapUtil.Madvice)}.
* {@link ByteBufferUtil#posixMadvise(ByteBuffer, ByteBufferUtil.Madvice)}.
* <p>
* Ported from <a href=
* "https://github.com/upserve/uppend/blob/70967c6f24d7f1a3bbc18799f485d981da93f53b/src/main/java/com/upserve/uppend/blobs/NativeIO.java">upserve/uppend/NativeIO</a>.
@ -69,7 +69,7 @@ class Madvise {
* @throws IOException If an error occurs or madvise not available on this system
* @see <a href="https://man7.org/linux/man-pages/man2/madvise.2.html">madvise(2) Linux manual page</a>
*/
static void madvise(ByteBuffer buffer, int value) throws IOException {
static void posixMadvise(ByteBuffer buffer, int value) throws IOException {
if (pageSize <= 0) {
throw new IOException("madvise failed, pagesize not available");
}
@ -79,7 +79,7 @@ class Madvise {
long alignedAddress = alignedAddress(address);
long alignedSize = alignedSize(alignedAddress, capacity);
try {
int val = nativeC.madvise(alignedAddress, alignedSize, value);
int val = nativeC.posix_madvise(alignedAddress, alignedSize, value);
if (val != 0) {
throw new IOException(String.format("System call madvise failed with code: %d", val));
}
@ -91,7 +91,7 @@ class Madvise {
/** JNR-FFI will automatically compile these to wrappers around native functions with the same signatures. */
public interface NativeC {
int madvise(@size_t long address, @size_t long size, int advice);
int posix_madvise(@size_t long address, @size_t long size, int advice);
int getpagesize();
}

Wyświetl plik

@ -1,87 +0,0 @@
package com.onthegomap.planetiler.util;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Utilities for working with memory-mapped files.
*/
public class MmapUtil {
private static final Logger LOGGER = LoggerFactory.getLogger(MmapUtil.class);
/** Attempts to invoke native utility and logs an error message if not available. */
public static void init() {
if (Madvise.pageSize < 0) {
try {
madvise(ByteBuffer.allocateDirect(1), Madvice.RANDOM);
} catch (IOException e) {
LOGGER.info("madvise not available on this system");
}
}
}
/**
* Give a hint to the system how a mapped memory segment will be used so the OS can optimize performance.
*
* @param buffer The mapped memory segment.
* @param value The advice to use.
* @throws IOException If an error occurs or madvise not available on this system
* @see <a href="https://man7.org/linux/man-pages/man2/madvise.2.html">madvise(2) Linux manual page</a>
*/
public static void madvise(ByteBuffer buffer, Madvice value) throws IOException {
Madvise.madvise(buffer, value.value);
}
/**
* Attempt to force-unmap a list of memory-mapped file segments so it can safely be deleted.
*
* @param segments The segments to unmap
* @throws IOException If any error occurs unmapping the segment
*/
public static void unmap(MappedByteBuffer... segments) throws IOException {
try {
// attempt to force-unmap the file, so we can delete it later
// https://stackoverflow.com/questions/2972986/how-to-unmap-a-file-from-memory-mapped-using-filechannel-in-java
Class<?> unsafeClass;
try {
unsafeClass = Class.forName("sun.misc.Unsafe");
} catch (Exception ex) {
unsafeClass = Class.forName("jdk.internal.misc.Unsafe");
}
Method clean = unsafeClass.getMethod("invokeCleaner", ByteBuffer.class);
clean.setAccessible(true);
Field theUnsafeField = unsafeClass.getDeclaredField("theUnsafe");
theUnsafeField.setAccessible(true);
Object theUnsafe = theUnsafeField.get(null);
for (MappedByteBuffer buffer : segments) {
if (buffer != null) {
clean.invoke(theUnsafe, buffer);
}
}
} catch (Exception e) {
throw new IOException("Unable to unmap", e);
}
}
/** Values from https://man7.org/linux/man-pages/man2/madvise.2.html */
public enum Madvice {
NORMAL(0),
RANDOM(1),
SEQUENTIAL(2),
WILLNEED(3),
DONTNEED(4);
final int value;
Madvice(int value) {
this.value = value;
}
}
}

Wyświetl plik

@ -109,4 +109,26 @@ public class Parse {
return null;
}
}
/** Parses a value for {@code -Xmx/-Xms} JVM option like "100" or "500k" or "15g" */
public static long jvmMemoryStringToBytes(String value) {
try {
value = value.strip();
char lastChar = value.charAt(value.length() - 1);
if (Character.isDigit(lastChar)) {
return Long.parseLong(value);
} else {
long base = Long.parseLong(value.substring(0, value.length() - 1));
lastChar = Character.toLowerCase(lastChar);
return switch (lastChar) {
case 'k' -> base * 1024L;
case 'm' -> base * 1024L * 1024L;
case 'g' -> base * 1024L * 1024L * 1024L;
default -> throw new NumberFormatException();
};
}
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Unable to parse size: " + value);
}
}
}

Wyświetl plik

@ -0,0 +1,220 @@
package com.onthegomap.planetiler.util;
import com.onthegomap.planetiler.collection.Storage;
import com.onthegomap.planetiler.stats.ProcessInfo;
import java.io.IOException;
import java.nio.file.FileStore;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.OptionalLong;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Check that estimated resource usage (memory, disk) will not exceed limit so we can fail fast before starting.
* <p>
* Each method mutates the instance and returns it for chaining. The {@code --force} option will bypass these checks.
*/
@ThreadSafe
public class ResourceUsage {
/** System RAM - JVM -Xmx memory setting. */
public static final LimitedResource OFF_HEAP_MEMORY = new Global(
"free system memory",
"run on a machine with more RAM or decrease JVM -Xmx setting",
ProcessInfo::getSystemFreeMemoryBytes
);
/** Memory used by {@link java.nio.ByteBuffer#allocateDirect(int)}. */
public static final LimitedResource DIRECT_MEMORY = new Global(
"JVM direct memory",
"increase JVM -XX:MaxDirectMemorySize setting",
() -> OptionalLong.of(ProcessInfo.getMaxMemoryBytes())
);
/** Memory used JVM heap. */
public static final LimitedResource HEAP = new Global(
"JVM heap",
"increase JVM -Xmx setting",
() -> OptionalLong.of(ProcessInfo.getMaxMemoryBytes())
);
private static final Format FORMAT = Format.defaultInstance();
private static final Logger LOGGER = LoggerFactory.getLogger(ResourceUsage.class);
private final CopyOnWriteArrayList<Usage> usages = new CopyOnWriteArrayList<>();
private final String description;
/** Creates a new resource checker for a part of execution described by {@code description}. */
public ResourceUsage(String description) {
this.description = description;
}
/** Returns the total amount of disk requested so far. */
public long diskUsage() {
return usages.stream()
.filter(d -> d.resource instanceof DiskUsage)
.mapToLong(d -> d.amount)
.sum();
}
/** Returns the total amount of disk requested so far for {@code types}. */
public long get(LimitedResource... types) {
var typeList = List.of(types);
return usages.stream()
.filter(d -> typeList.contains(d.resource))
.mapToLong(d -> d.amount)
.sum();
}
/** Requests {@code amount} bytes on the file system that contains {@code path}. */
public ResourceUsage addDisk(Path path, long amount, String description) {
return add(new DiskUsage(path), amount, description);
}
/** Requests {@code amount} bytes of RAM in the JVM heap. */
public ResourceUsage addMemory(long amount, String description) {
return add(HEAP, amount, description);
}
/** Requests {@code amount} bytes of direct memory that will exist outside the JVM heap. */
public ResourceUsage addDirectMemory(long amount, String description) {
return add(DIRECT_MEMORY, amount, description)
.add(OFF_HEAP_MEMORY, amount, description);
}
/** Requests {@code amount} bytes using {@code storage}. */
public ResourceUsage add(Path path, Storage storage, long amount, String description) {
return switch (storage) {
case RAM -> addMemory(amount, description);
case DIRECT -> addDirectMemory(amount, description);
case MMAP -> addDisk(path, amount, description);
};
}
/** Requests {@code amount} bytes of {@code resource}. */
public ResourceUsage add(LimitedResource resource, long amount, String description) {
return add(new Usage(resource, amount, description));
}
public ResourceUsage add(Usage usage) {
if (usage.amount != 0) {
usages.add(usage);
}
return this;
}
/** Adds all resource requests in {@code other} to this instance and returns it for changing. */
public ResourceUsage addAll(ResourceUsage other) {
usages.addAll(other.usages);
return this;
}
/**
* Add up all the resource requests and logs an error if any exceed the limit for that resource.
*
* @param force If false, then throws an exception, otherwise just logs a warning
* @param verbose If true then print each resource request even if it is under the limit
*/
public void checkAgainstLimits(boolean force, boolean verbose) {
List<String> issues = new ArrayList<>();
var grouped = usages.stream().collect(Collectors.groupingBy(Usage::resource));
for (var entry : grouped.entrySet()) {
LimitedResource resource = entry.getKey();
List<Usage> usages = entry.getValue();
long requested = usages.stream().mapToLong(Usage::amount).sum();
String requestedString = FORMAT.storage(requested);
OptionalLong limitMaybe = resource.limit();
if (limitMaybe.isEmpty()) {
LOGGER
.warn(requestedString + " requested but unable to get limit for " + resource.description() + ", may fail.");
} else {
long limit = limitMaybe.getAsLong();
String limitString = FORMAT.storage(limit);
String summary = requestedString + " " + resource.description() + " requested for " + description + ", " +
limitString + " available";
if (limit < requested) {
LOGGER.warn("❌️ " + summary + (resource instanceof Fixable fixable ? " (" + fixable.howToFix() + ")" : ""));
for (var usage : usages) {
LOGGER.warn(" - " + FORMAT.storage(usage.amount) + " used for " + usage.description);
}
} else if (limit < requested * 1.1) {
LOGGER.info("⚠️️ " + summary + (resource instanceof Fixable fixable ? " (" + fixable.howToFix() + ")" : ""));
for (var usage : usages) {
LOGGER.info(" - " + FORMAT.storage(usage.amount) + " used for " + usage.description);
}
} else if (verbose) {
LOGGER.debug("✓ " + summary);
for (var usage : usages) {
LOGGER.debug(" - " + FORMAT.storage(usage.amount) + " used for " + usage.description);
}
}
if (limit < requested) {
issues.add(summary);
}
}
}
if (!force && !issues.isEmpty()) {
throw new IllegalStateException("Insufficient resources for " + description +
", use the --force argument to continue anyway:\n" + String.join("\n", issues));
}
}
/** A resource with instructions for increasing. */
public interface Fixable {
/** Instructions to increase a resouce. */
String howToFix();
}
/** A resource (like RAM or disk space) that has a fixed limit on this system. */
public interface LimitedResource {
/** The total amount of this resource available, or empty if unable to determine the limit. */
OptionalLong limit();
String description();
}
/** An amount of a resource that has been requested. */
public record Usage(LimitedResource resource, long amount, String description) {}
/** A shared global resource on this system. */
public record Global(
@Override String description,
@Override String howToFix,
@Override Supplier<OptionalLong> limitProvider
) implements LimitedResource, Fixable {
@Override
public OptionalLong limit() {
return limitProvider.get();
}
}
/** Limited disk space on {@code fileStore}. */
public record DiskUsage(FileStore fileStore) implements LimitedResource {
/** Finds the {@link FileStore} that {@code path} will exist on. */
DiskUsage(Path path) {
this(FileUtils.getFileStore(path));
}
@Override
public OptionalLong limit() {
try {
return OptionalLong.of(fileStore.getUnallocatedSpace());
} catch (IOException e) {
return OptionalLong.empty();
}
}
@Override
public String description() {
return "storage on " + fileStore.toString();
}
}
}

Wyświetl plik

@ -0,0 +1,48 @@
package com.onthegomap.planetiler.util;
import com.google.common.util.concurrent.Monitor;
import java.util.concurrent.atomic.AtomicLong;
/**
* Lets multiple threads work on a sliding window of values.
*
* Calls to {@link #waitUntilInsideWindow(long)} will block until {@link #advanceTail(long)} is within a given range
* from the new value being requested.
*/
public class SlidingWindow {
private final AtomicLong tail = new AtomicLong(0);
private final Monitor monitor = new Monitor();
private final long windowSize;
public SlidingWindow(long windowSize) {
this.windowSize = windowSize;
}
/**
* Moves the current value for the tail to {@code to}, unblocking any thread waiting on moving the head to
* {@code to + windowSize}.
*/
public void advanceTail(long to) {
monitor.enter();
try {
if (to < tail.get()) {
throw new IllegalStateException("Tried to move sliding window tail backwards from " + tail + " to " + to);
}
tail.set(to);
} finally {
monitor.leave();
}
}
/** Blocks until another thread moves the tail to at least {@code to - windowSize}. */
public void waitUntilInsideWindow(long to) {
try {
monitor.enterWhen(monitor.newGuard(() -> to - tail.longValue() < windowSize));
monitor.leave();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
}

Wyświetl plik

@ -7,4 +7,12 @@ package com.onthegomap.planetiler.worker;
public interface RunnableThatThrows {
void run() throws Exception;
default void runAndWrapException() {
try {
run();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

Wyświetl plik

@ -51,6 +51,7 @@ public class Worker {
stats.timers().finishedWorker(prefix, Duration.ofNanos(System.nanoTime() - start));
} catch (Throwable e) {
System.err.println("Worker " + id + " died");
e.printStackTrace();
throwRuntimeException(e);
} finally {
LOGGER.trace("Finished worker");

Wyświetl plik

@ -16,7 +16,7 @@ import java.util.function.Consumer;
* A mini-framework for chaining sequential steps that run in dedicated threads with a queue between each.
* <p>
* For example:
*
*
* <pre>
* {@code
* WorkerPipeline.start("name", stats)
@ -59,7 +59,6 @@ public record WorkerPipeline<T> (
*/
public void awaitAndLog(ProgressLoggers loggers, Duration logInterval) {
loggers.awaitAndLog(done, logInterval);
loggers.log();
}
/**

Wyświetl plik

@ -0,0 +1,8 @@
package com.onthegomap.planetiler;
/** An exception intentionally thrown by a test. */
public class ExpectedException extends Error {
public ExpectedException() {
super("expected exception", null, true, false);
}
}

Wyświetl plik

@ -810,7 +810,7 @@ public class PlanetilerTests {
with(new OsmElement.Node(1, 0, 0), t -> t.setTag("attr", "value"))
),
(in, features) -> {
throw new Error();
throw new ExpectedException();
}
));
}

Wyświetl plik

@ -72,7 +72,7 @@ public class AppendStoreTest {
@BeforeEach
public void setup() {
this.store = new AppendStoreRam.Ints(4 << 2);
this.store = new AppendStoreRam.Ints(false, 4 << 2);
}
}
@ -84,11 +84,19 @@ public class AppendStoreTest {
}
}
static class DirectInt extends IntsTest {
@BeforeEach
public void setup() {
this.store = new AppendStoreRam.Ints(true, 4 << 2);
}
}
static class RamLong extends LongsTest {
@BeforeEach
public void setup() {
this.store = new AppendStoreRam.Longs(4 << 2);
this.store = new AppendStoreRam.Longs(false, 4 << 2);
}
}
@ -100,6 +108,14 @@ public class AppendStoreTest {
}
}
static class DirectLong extends LongsTest {
@BeforeEach
public void setup() {
this.store = new AppendStoreRam.Longs(true, 4 << 2);
}
}
static class MMapSmallLong extends LongsTest {
@BeforeEach
@ -113,7 +129,15 @@ public class AppendStoreTest {
@BeforeEach
public void setup() {
this.store = new AppendStore.SmallLongs((i) -> new AppendStoreRam.Ints(4 << 2));
this.store = new AppendStore.SmallLongs((i) -> new AppendStoreRam.Ints(false, 4 << 2));
}
}
static class DirectSmallLong extends LongsTest {
@BeforeEach
public void setup() {
this.store = new AppendStore.SmallLongs((i) -> new AppendStoreRam.Ints(true, 4 << 2));
}
}
}

Wyświetl plik

@ -2,57 +2,77 @@ package com.onthegomap.planetiler.collection;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import com.onthegomap.planetiler.util.Format;
import com.onthegomap.planetiler.util.ResourceUsage;
import java.nio.file.Path;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
public abstract class LongLongMapTest {
protected LongLongMap map;
private LongLongMap.SequentialWrites sequential;
protected abstract LongLongMap.SequentialWrites createSequentialWriter(Path path);
@BeforeEach
public void setupSequentialWriter(@TempDir Path path) {
this.sequential = createSequentialWriter(path);
}
@Test
public void missingValue() {
assertEquals(Long.MIN_VALUE, map.get(0));
assertEquals(Long.MIN_VALUE, sequential.get(0));
}
@Test
public void insertLookup() {
map.put(1, 1);
assertEquals(Long.MIN_VALUE, map.get(0));
assertEquals(1, map.get(1));
assertEquals(Long.MIN_VALUE, map.get(2));
sequential.put(1, 1);
assertEquals(Long.MIN_VALUE, sequential.get(0));
assertEquals(1, sequential.get(1));
assertEquals(Long.MIN_VALUE, sequential.get(2));
}
@Test
public void insertLookupHighValue() {
sequential.put(1_000_000, 1);
assertEquals(Long.MIN_VALUE, sequential.get(999_999));
assertEquals(1, sequential.get(1_000_000));
assertEquals(Long.MIN_VALUE, sequential.get(1_000_001));
}
@Test
public void insertWithGaps() {
map.put(1, 2);
map.put(50, 3);
map.put(500, 4);
map.put(505, 5);
assertEquals(Long.MIN_VALUE, map.get(0));
assertEquals(2, map.get(1));
assertEquals(Long.MIN_VALUE, map.get(2));
assertEquals(Long.MIN_VALUE, map.get(49));
assertEquals(3, map.get(50));
assertEquals(Long.MIN_VALUE, map.get(51));
assertEquals(Long.MIN_VALUE, map.get(300));
assertEquals(Long.MIN_VALUE, map.get(499));
assertEquals(4, map.get(500));
assertEquals(Long.MIN_VALUE, map.get(501));
assertEquals(5, map.get(505));
assertEquals(Long.MIN_VALUE, map.get(506));
assertEquals(Long.MIN_VALUE, map.get(1_000));
sequential.put(1, 2);
sequential.put(50, 3);
sequential.put(500, 4);
sequential.put(505, 5);
assertEquals(Long.MIN_VALUE, sequential.get(0));
assertEquals(2, sequential.get(1));
assertEquals(Long.MIN_VALUE, sequential.get(2));
assertEquals(Long.MIN_VALUE, sequential.get(49));
assertEquals(3, sequential.get(50));
assertEquals(Long.MIN_VALUE, sequential.get(51));
assertEquals(Long.MIN_VALUE, sequential.get(300));
assertEquals(Long.MIN_VALUE, sequential.get(499));
assertEquals(4, sequential.get(500));
assertEquals(Long.MIN_VALUE, sequential.get(501));
assertEquals(5, sequential.get(505));
assertEquals(Long.MIN_VALUE, sequential.get(506));
assertEquals(Long.MIN_VALUE, sequential.get(1_000));
}
@Test
public void insertMultiLookup() {
map.put(1, 3);
map.put(2, 4);
map.put(1_000_000_000, Long.MAX_VALUE);
assertEquals(Long.MIN_VALUE, map.get(0));
assertEquals(Long.MIN_VALUE, map.get(3));
sequential.put(1, 3);
sequential.put(2, 4);
sequential.put(1_000_000, Long.MAX_VALUE);
assertEquals(Long.MIN_VALUE, sequential.get(0));
assertEquals(Long.MIN_VALUE, sequential.get(3));
assertArrayEquals(new long[]{3, 4, Long.MAX_VALUE, Long.MIN_VALUE},
map.multiGet(new long[]{1, 2, 1_000_000_000, 3}));
sequential.multiGet(new long[]{1, 2, 1_000_000, 3}));
}
@Test
@ -60,34 +80,75 @@ public abstract class LongLongMapTest {
long[] key = new long[50000];
long[] expected = new long[50000];
for (int i = 0; i < 50000; i++) {
map.put(i * 4, i + 1);
sequential.put(i * 4, i + 1);
key[i] = i * 4;
expected[i] = i + 1;
}
long[] result = map.multiGet(key);
long[] result = sequential.multiGet(key);
assertArrayEquals(expected, result);
}
public static class SortedTable extends LongLongMapTest {
public static class SortedTableTest extends LongLongMapTest {
@BeforeEach
public void setup() {
this.map = new LongLongMap.SortedTable(
@Override
protected LongLongMap.SequentialWrites createSequentialWriter(Path path) {
return new SortedTableLongLongMap(
new AppendStore.SmallLongs(
i -> new AppendStoreRam.Ints()
i -> new AppendStoreRam.Ints(false)
),
new AppendStoreRam.Longs()
new AppendStoreRam.Longs(false)
);
}
}
public static class SparseArray3 extends LongLongMapTest {
public static class SparseArrayTest extends LongLongMapTest {
@BeforeEach
public void setup() {
this.map = new LongLongMap.SparseArray(new AppendStoreRam.Longs());
@Override
protected LongLongMap.SequentialWrites createSequentialWriter(Path path) {
return new SparseArrayLongLongMap(new AppendStoreRam.Longs(false));
}
}
public static class DirectTest extends LongLongMapTest {
@Override
protected LongLongMap.SequentialWrites createSequentialWriter(Path path) {
return new SparseArrayLongLongMap(new AppendStoreRam.Longs(true));
}
}
public static class AllTest {
@Test
public void testAllImplementations(@TempDir Path path) {
for (Storage storage : Storage.values()) {
for (LongLongMap.Type type : LongLongMap.Type.values()) {
var variant = storage + "-" + type;
var params = new Storage.Params(path.resolve(variant), true);
var estimatedSize = LongLongMap.estimateStorageRequired(type, storage, 70_000_000_000L, params.path());
var usage = storage == Storage.MMAP ? estimatedSize.diskUsage() :
estimatedSize.get(
storage == Storage.DIRECT ? ResourceUsage.DIRECT_MEMORY : ResourceUsage.HEAP
);
var sizeDescription = variant + " " + Format.defaultInstance().storage(usage);
// sanity check to ensure that the estimate size is between 60 and 100GB for a 70GB input file
if (type != LongLongMap.Type.NOOP) {
assertTrue(usage > 60_000_000_000L, sizeDescription);
assertTrue(usage < 100_000_000_000L, sizeDescription);
}
LongLongMap map = LongLongMap.from(type, storage, params);
try (var writer = map.newWriter()) {
writer.put(2, 3);
writer.put(4, 5);
}
if (type != LongLongMap.Type.NOOP) {
assertEquals(3, map.get(2), variant);
assertEquals(5, map.get(4), variant);
}
}
}
}
}
}

Wyświetl plik

@ -0,0 +1,180 @@
package com.onthegomap.planetiler.collection;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.io.IOException;
import java.nio.file.Path;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.io.TempDir;
public abstract class ParallelLongLongMapTest extends LongLongMapTest {
private LongLongMap.ParallelWrites parallel;
protected abstract LongLongMap.ParallelWrites create(Path path);
@Test
@Timeout(10)
public void testWaitForBothWritersToClose() throws InterruptedException {
var writer1 = parallel.newWriter();
var writer2 = parallel.newWriter();
writer1.put(0, 1);
writer1.close();
writer2.put(1, 2);
writer2.close();
assertEquals(1, parallel.get(0));
assertEquals(2, parallel.get(1));
}
@Test
@Timeout(10)
public void testInterleavedWritesFromParallelThreads() throws InterruptedException {
int limit = 1000;
var ready = new CyclicBarrier(2);
Thread thread1 = new Thread(() -> {
try (var writer = parallel.newWriter()) {
ready.await();
for (int i = 1; i < limit; i++) {
writer.put(i * 2, i * 2);
}
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
});
Thread thread2 = new Thread(() -> {
try (var writer = parallel.newWriter()) {
ready.await();
for (int i = 1; i < limit; i++) {
writer.put(i * 10 + 1, i * 3);
}
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
});
thread1.start();
thread2.start();
thread1.join();
thread2.join();
assertEquals(Long.MIN_VALUE, parallel.get(0));
for (int i = 1; i < limit; i++) {
assertEquals(i * 2, parallel.get(i * 2), "item:" + i * 2);
assertEquals(i * 3, parallel.get(i * 10 + 1), "item:" + (i * 10 + 1));
}
}
@Test
@Timeout(10)
public void testAdjacentBlocksFromParallelThreads() throws InterruptedException {
int limit = 1000;
var ready = new CyclicBarrier(2);
Thread thread1 = new Thread(() -> {
try (var writer = parallel.newWriter()) {
ready.await();
for (int i = 1; i < limit; i++) {
writer.put(i, i);
}
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
});
Thread thread2 = new Thread(() -> {
try (var writer = parallel.newWriter()) {
ready.await();
for (int i = 1; i < limit * 2; i++) {
writer.put(i + limit, i * 2L);
}
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
});
thread1.start();
thread2.start();
thread1.join();
thread2.join();
assertEquals(Long.MIN_VALUE, parallel.get(0));
for (int i = 1; i < limit; i++) {
assertEquals(i, parallel.get(i), "item:" + i);
assertEquals(i * 2, parallel.get(i + limit), "item:" + i * 2);
}
}
@BeforeEach
public void setupParallelWriter(@TempDir Path path) {
this.parallel = create(path);
}
@Override
protected LongLongMap.SequentialWrites createSequentialWriter(Path path) {
var sequentialMap = create(path);
var writer = sequentialMap.newWriter();
return new LongLongMap.SequentialWrites() {
@Override
public void put(long key, long value) {
writer.put(key, value);
}
@Override
public long get(long key) {
return sequentialMap.get(key);
}
@Override
public void close() throws IOException {
sequentialMap.close();
writer.close();
}
};
}
public static class ArrayMmapLargeTest extends ParallelLongLongMapTest {
@Override
protected LongLongMap.ParallelWrites create(Path path) {
return new ArrayLongLongMapMmap(path.resolve("node.db"), 20, 2, true);
}
}
public static class ArrayMmapSmallTest extends ParallelLongLongMapTest {
@Override
protected LongLongMap.ParallelWrites create(Path path) {
return new ArrayLongLongMapMmap(path.resolve("node.db"), 4, 2, true);
}
}
public static class ArrayDirectSmallTest extends ParallelLongLongMapTest {
@Override
protected LongLongMap.ParallelWrites create(Path path) {
return new ArrayLongLongMapRam(true, 4);
}
}
public static class ArrayDirectLargeTest extends ParallelLongLongMapTest {
@Override
protected LongLongMap.ParallelWrites create(Path path) {
return new ArrayLongLongMapRam(true, 10);
}
}
public static class ArrayRamLargeTest extends ParallelLongLongMapTest {
@Override
protected LongLongMap.ParallelWrites create(Path path) {
return new ArrayLongLongMapRam(false, 10);
}
}
public static class ArrayRamSmallTest extends ParallelLongLongMapTest {
@Override
protected LongLongMap.ParallelWrites create(Path path) {
return new ArrayLongLongMapRam(false, 4);
}
}
}

Wyświetl plik

@ -0,0 +1,160 @@
package com.onthegomap.planetiler.reader.osm;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
public class OsmPhaserTest {
@Test
@Timeout(1)
public void testAdvanceSingleThread() {
var phaser = new OsmPhaser(0);
var forWorker = phaser.forWorker();
assertEquals(OsmPhaser.Phase.BEGIN, phaser.getPhase());
// multiple calls stays put
forWorker.arrive(OsmPhaser.Phase.NODES);
assertEquals(OsmPhaser.Phase.NODES, phaser.getPhase());
assertEquals(0, phaser.nodes());
forWorker.arrive(OsmPhaser.Phase.NODES);
assertEquals(OsmPhaser.Phase.NODES, phaser.getPhase());
assertEquals(1, phaser.nodes());
forWorker.arriveAndWaitForOthers(OsmPhaser.Phase.WAYS);
assertEquals(2, phaser.nodes());
assertEquals(OsmPhaser.Phase.WAYS, phaser.getPhase());
forWorker.arriveAndWaitForOthers(OsmPhaser.Phase.RELATIONS);
assertEquals(OsmPhaser.Phase.RELATIONS, phaser.getPhase());
forWorker.close();
assertEquals(OsmPhaser.Phase.DONE, phaser.getPhase());
assertEquals(2, phaser.nodes());
assertEquals(1, phaser.ways());
assertEquals(1, phaser.relations());
}
@Test
@Timeout(1)
public void testAdvanceAndSkipPhases() {
var phaser = new OsmPhaser(1);
var forWorker = phaser.forWorker();
assertEquals(OsmPhaser.Phase.BEGIN, phaser.getPhase());
forWorker.arrive(OsmPhaser.Phase.NODES);
assertEquals(0, phaser.nodes(), "don't increment count before processing an element");
assertEquals(OsmPhaser.Phase.NODES, phaser.getPhase());
forWorker.arriveAndWaitForOthers(OsmPhaser.Phase.RELATIONS);
assertEquals(1, phaser.nodes(), "increment count until after processing an element");
assertEquals(OsmPhaser.Phase.RELATIONS, phaser.getPhase());
forWorker.close();
assertEquals(1, phaser.nodes());
assertEquals(0, phaser.ways());
assertEquals(1, phaser.relations());
assertEquals(OsmPhaser.Phase.DONE, phaser.getPhase());
}
@Test
@Timeout(1)
public void testWorkerAdvanceSideEffect() {
var nodesCalled = new AtomicBoolean(false);
var waysCalled = new AtomicBoolean(false);
var phaser = new OsmPhaser(0);
var forWorker = phaser.forWorker()
.whenWorkerFinishes(OsmPhaser.Phase.NODES, () -> nodesCalled.set(true))
.whenWorkerFinishes(OsmPhaser.Phase.WAYS, () -> waysCalled.set(true));
assertFalse(nodesCalled.get());
assertFalse(waysCalled.get());
forWorker.arrive(OsmPhaser.Phase.WAYS);
assertTrue(nodesCalled.get());
assertFalse(waysCalled.get());
forWorker.arriveAndWaitForOthers(OsmPhaser.Phase.RELATIONS);
assertTrue(waysCalled.get());
forWorker.close();
}
@Test
@Timeout(1)
public void testCantGoBackwards() {
var phaser = new OsmPhaser(1);
var forWorker = phaser.forWorker();
forWorker.arrive(OsmPhaser.Phase.WAYS);
assertThrows(IllegalStateException.class, () -> forWorker.arrive(OsmPhaser.Phase.NODES));
forWorker.close();
assertThrows(IllegalStateException.class, () -> forWorker.arrive(OsmPhaser.Phase.RELATIONS));
}
@Test
@Timeout(1)
public void testWaitToAdvance() throws InterruptedException {
var phaser = new OsmPhaser(2);
var latch1 = new CountDownLatch(1);
var latch2 = new CountDownLatch(1);
var latch3 = new CountDownLatch(1);
var workingOnRelations = new AtomicBoolean(false);
var workingOnWays = new AtomicBoolean(false);
Thread t1 = new Thread(() -> {
try (var worker = phaser.forWorker()) {
worker.arrive(OsmPhaser.Phase.NODES);
worker.arrive(OsmPhaser.Phase.WAYS);
latch2.countDown();
workingOnWays.set(true);
worker.arriveAndWaitForOthers(OsmPhaser.Phase.RELATIONS);
workingOnRelations.set(true);
worker.arriveAndWaitForOthers(OsmPhaser.Phase.RELATIONS);
worker.arriveAndWaitForOthers(OsmPhaser.Phase.RELATIONS);
} catch (Exception e) {
e.printStackTrace();
}
});
Thread t2 = new Thread(() -> {
try (var worker = phaser.forWorker()) {
worker.arrive(OsmPhaser.Phase.NODES);
latch1.await();
worker.arrive(OsmPhaser.Phase.WAYS);
latch3.await();
worker.arriveAndWaitForOthers(OsmPhaser.Phase.RELATIONS);
worker.arriveAndWaitForOthers(OsmPhaser.Phase.RELATIONS);
worker.arriveAndWaitForOthers(OsmPhaser.Phase.RELATIONS);
} catch (Exception e) {
e.printStackTrace();
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
});
t1.start();
t2.start();
latch2.await(); // proves that thread 1 advanced to ways before thread 2 did
latch1.countDown(); // let thread 2 continue on to ways
// wait and make sure thread 1 is still waiting on arriveAndWaitForOthers(RELATIONS)
Thread.sleep(100);
assertTrue(workingOnWays.get());
assertFalse(workingOnRelations.get());
latch3.countDown(); // now let thread 2 move onto relations, unblock thread 1 and they both finish
t1.join();
t2.join();
assertEquals(OsmPhaser.Phase.DONE, phaser.getPhase());
assertEquals(2, phaser.nodes());
assertEquals(2, phaser.ways());
assertEquals(6, phaser.relations());
}
}

Wyświetl plik

@ -29,13 +29,16 @@ public class OsmReaderTest {
private final Profile profile = new Profile.NullProfile();
private final LongLongMap nodeMap = LongLongMap.newInMemorySortedTable();
private void processPass1Block(OsmReader reader, Iterable<OsmElement> block) {
reader.processPass1Blocks(List.of(block));
}
@Test
public void testPoint() throws GeometryException {
OsmReader reader = newOsmReader();
var node = new OsmElement.Node(1, 0, 0);
node.setTag("key", "value");
reader.processPass1Block(List.of(node));
processPass1Block(reader, List.of(node));
SourceFeature feature = reader.processNodePass2(node);
assertTrue(feature.isPoint());
assertFalse(feature.canBePolygon());
@ -64,7 +67,7 @@ public class OsmReaderTest {
way.nodes().add(node1.id(), node2.id());
way.setTag("key", "value");
reader.processPass1Block(List.of(node1, node2, way));
processPass1Block(reader, List.of(node1, node2, way));
SourceFeature feature = reader.processWayPass2(way, nodeCache);
assertTrue(feature.canBeLine());
@ -104,7 +107,7 @@ public class OsmReaderTest {
way.nodes().add(1, 2, 3, 4, 1);
way.setTag("key", "value");
reader.processPass1Block(List.of(node1, node2, node3, node4, way));
processPass1Block(reader, List.of(node1, node2, node3, node4, way));
SourceFeature feature = reader.processWayPass2(way, nodeCache);
assertTrue(feature.canBeLine());
@ -143,7 +146,7 @@ public class OsmReaderTest {
way.nodes().add(1, 2, 3, 4, 1);
way.setTag("area", "yes");
reader.processPass1Block(List.of(node1, node2, node3, node4, way));
processPass1Block(reader, List.of(node1, node2, node3, node4, way));
SourceFeature feature = reader.processWayPass2(way, nodeCache);
assertFalse(feature.canBeLine());
@ -179,7 +182,7 @@ public class OsmReaderTest {
way.nodes().add(1, 2, 3, 4, 1);
way.setTag("area", "no");
reader.processPass1Block(List.of(node1, node2, node3, node4, way));
processPass1Block(reader, List.of(node1, node2, node3, node4, way));
SourceFeature feature = reader.processWayPass2(way, nodeCache);
assertTrue(feature.canBeLine());
@ -210,7 +213,7 @@ public class OsmReaderTest {
var way = new OsmElement.Way(3);
way.nodes().add(1);
reader.processPass1Block(List.of(node1, way));
processPass1Block(reader, List.of(node1, way));
SourceFeature feature = reader.processWayPass2(way, reader.newNodeLocationProvider());
assertFalse(feature.canBeLine());
@ -235,7 +238,7 @@ public class OsmReaderTest {
var way = new OsmElement.Way(3);
way.nodes().add(1, 2, 1);
reader.processPass1Block(List.of(node1, node2, way));
processPass1Block(reader, List.of(node1, node2, way));
SourceFeature feature = reader.processWayPass2(way, reader.newNodeLocationProvider());
assertTrue(feature.canBeLine());
@ -266,7 +269,7 @@ public class OsmReaderTest {
public void testInvalidPolygon() throws GeometryException {
OsmReader reader = newOsmReader();
reader.processPass1Block(List.of(
processPass1Block(reader, List.of(
node(1, 0.5, 0.5),
node(2, 0.75, 0.5),
node(3, 0.5, 0.75),
@ -275,7 +278,7 @@ public class OsmReaderTest {
var way = new OsmElement.Way(6);
way.setTag("area", "yes");
way.nodes().add(1, 2, 3, 4, 1);
reader.processPass1Block(List.of(way));
processPass1Block(reader, List.of(way));
SourceFeature feature = reader.processWayPass2(way, reader.newNodeLocationProvider());
assertFalse(feature.canBeLine());
@ -313,7 +316,7 @@ public class OsmReaderTest {
OsmReader reader = newOsmReader();
var way = new OsmElement.Way(321);
way.nodes().add(123, 2222, 333, 444, 123);
reader.processPass1Block(List.of(way));
processPass1Block(reader, List.of(way));
SourceFeature feature = reader.processWayPass2(way, reader.newNodeLocationProvider());
assertTrue(feature.canBeLine());
@ -369,7 +372,7 @@ public class OsmReaderTest {
relation
);
reader.processPass1Block(elements);
processPass1Block(reader, elements);
elements.stream().flatMap(nodes).forEach(reader::processNodePass2);
var nodeCache = reader.newNodeLocationProvider();
elements.stream().flatMap(ways).forEach(way -> reader.processWayPass2(way, nodeCache));
@ -441,7 +444,7 @@ public class OsmReaderTest {
relation
);
reader.processPass1Block(elements);
processPass1Block(reader, elements);
elements.stream().flatMap(nodes).forEach(reader::processNodePass2);
var nodeCache = reader.newNodeLocationProvider();
elements.stream().flatMap(ways).forEach(way -> reader.processWayPass2(way, nodeCache));
@ -507,7 +510,7 @@ public class OsmReaderTest {
relation
);
reader.processPass1Block(elements);
processPass1Block(reader, elements);
elements.stream().flatMap(nodes).forEach(reader::processNodePass2);
var nodeCache = reader.newNodeLocationProvider();
elements.stream().flatMap(ways).forEach(way -> reader.processWayPass2(way, nodeCache));
@ -558,7 +561,7 @@ public class OsmReaderTest {
relation
);
reader.processPass1Block(elements);
processPass1Block(reader, elements);
elements.stream().flatMap(nodes).forEach(reader::processNodePass2);
var nodeCache = reader.newNodeLocationProvider();
elements.stream().flatMap(ways).forEach(way -> reader.processWayPass2(way, nodeCache));
@ -589,7 +592,7 @@ public class OsmReaderTest {
relation
);
reader.processPass1Block(elements);
processPass1Block(reader, elements);
elements.stream().flatMap(nodes).forEach(reader::processNodePass2);
var nodeCache = reader.newNodeLocationProvider();
elements.stream().flatMap(ways).forEach(way -> reader.processWayPass2(way, nodeCache));
@ -620,7 +623,7 @@ public class OsmReaderTest {
var relation = new OsmElement.Relation(4);
relation.members().add(new OsmElement.Relation.Member(OsmElement.Type.WAY, 3, "rolename"));
reader.processPass1Block(List.of(node1, node2, way, relation));
processPass1Block(reader, List.of(node1, node2, way, relation));
SourceFeature feature = reader.processWayPass2(way, nodeCache);
@ -648,7 +651,7 @@ public class OsmReaderTest {
relation.members().add(new OsmElement.Relation.Member(OsmElement.Type.RELATION, 3, "rolename"));
relation.members().add(new OsmElement.Relation.Member(OsmElement.Type.NODE, 3, "rolename"));
reader.processPass1Block(List.of(node1, node2, way, relation));
processPass1Block(reader, List.of(node1, node2, way, relation));
SourceFeature feature = reader.processWayPass2(way, nodeCache);

Wyświetl plik

@ -14,6 +14,26 @@ public class ProcessInfoTest {
assertTrue(ProcessInfo.getGcTime().toNanos() >= 0);
}
@Test
public void testGetDirectUsedMemoryBytes() {
assertTrue(ProcessInfo.getDirectUsedMemoryBytes() >= 0);
}
@Test
public void testGetDirectUsedMemoryLimit() {
assertTrue(ProcessInfo.getDirectUsedMemoryLimit() >= 0);
}
@Test
public void testGetOnHeapUsedMemoryBytes() {
assertTrue(ProcessInfo.getOnHeapUsedMemoryBytes() >= 0);
}
@Test
public void testGetSystemUsedMemoryBytes() {
assertTrue(ProcessInfo.getSystemMemoryBytes().getAsLong() >= 0);
}
@Test
public void testCPU() {
assertFalse(ProcessInfo.getProcessCpuTime().isEmpty());

Wyświetl plik

@ -0,0 +1,110 @@
package com.onthegomap.planetiler.util;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Locale;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
public class ByteBufferUtilTest {
@Test
public void testMadviseAndUnmap(@TempDir Path dir) throws IOException {
String osName = System.getProperty("os.name", "").toLowerCase(Locale.ROOT);
String data = "test";
int bytes = data.getBytes(StandardCharsets.UTF_8).length;
var path = dir.resolve("file");
Files.writeString(path, data, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
try (FileChannel channel = FileChannel.open(path, StandardOpenOption.READ)) {
MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_ONLY, 0, bytes);
try {
ByteBufferUtil.posixMadvise(buffer, ByteBufferUtil.Madvice.RANDOM);
byte[] received = new byte[bytes];
buffer.get(received);
assertEquals(data, new String(received, StandardCharsets.UTF_8));
} catch (IOException e) {
if (osName.startsWith("mac") || osName.startsWith("linux")) {
throw e;
} else {
System.out.println("madvise failed, but the system may not support it");
}
} finally {
ByteBufferUtil.free(buffer);
}
} finally {
Files.delete(path);
}
}
@Test
public void testFreeDirectByteBuffer() throws IOException {
ByteBufferUtil.free(ByteBuffer.allocateDirect(1));
}
@Test
public void testFreeHeapByteBuffer() throws IOException {
ByteBufferUtil.free(ByteBuffer.allocate(1));
}
private String readString(MappedByteBuffer buffer) {
byte[] result = new byte[buffer.limit()];
buffer.get(result);
return new String(result, StandardCharsets.UTF_8);
}
@Test
public void testMapFile(@TempDir Path dir) throws IOException {
String data = "test";
var path = dir.resolve("file");
Files.writeString(path, data, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
var channel = FileChannel.open(path, StandardOpenOption.READ);
MappedByteBuffer[] buffers = ByteBufferUtil.mapFile(channel, 4, 2, true);
assertEquals(2, buffers.length);
assertEquals("te", readString(buffers[0]));
assertEquals("st", readString(buffers[1]));
ByteBufferUtil.free(buffers);
}
@Test
public void testMapFileLeftoverSegment(@TempDir Path dir) throws IOException {
String data = "test!";
var path = dir.resolve("file");
Files.writeString(path, data, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
var channel = FileChannel.open(path, StandardOpenOption.READ);
MappedByteBuffer[] buffers = ByteBufferUtil.mapFile(channel, 5, 2, true);
assertEquals(3, buffers.length);
assertEquals("te", readString(buffers[0]));
assertEquals("st", readString(buffers[1]));
assertEquals("!", readString(buffers[2]));
ByteBufferUtil.free(buffers);
}
@Test
public void testMapFileFilterOutSegment(@TempDir Path dir) throws IOException {
String data = "test!";
var path = dir.resolve("file");
Files.writeString(path, data, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
var channel = FileChannel.open(path, StandardOpenOption.READ);
MappedByteBuffer[] buffers = ByteBufferUtil.mapFile(channel, 5, 2, true, i -> i != 0);
assertEquals(3, buffers.length);
assertNull(buffers[0]);
assertNotNull(buffers[1]);
assertNotNull(buffers[2]);
ByteBufferUtil.free(buffers);
}
}

Wyświetl plik

@ -146,7 +146,7 @@ public class DownloaderTest {
var resource1 = new Downloader.ResourceToDownload("resource", url, dest);
var exception = assertThrows(ExecutionException.class, () -> downloader.downloadIfNecessary(resource1).get());
assertInstanceOf(IllegalArgumentException.class, exception.getCause());
assertInstanceOf(IllegalStateException.class, exception.getCause());
assertTrue(exception.getMessage().contains("--force"), exception.getMessage());
}
}

Wyświetl plik

@ -34,4 +34,14 @@ public class FileUtilsTest {
assertEquals(0, FileUtils.size(parent));
assertEquals(0, FileUtils.size(tmpDir));
}
@Test
public void testGetFileStore() throws IOException {
var filestore = Files.getFileStore(tmpDir);
assertEquals(filestore, FileUtils.getFileStore(tmpDir.resolve("nonexistant_file")));
assertEquals(filestore, FileUtils.getFileStore(tmpDir.resolve("subdir").resolve("nonexistant_file")));
var nested = tmpDir.resolve("subdir").resolve("nonexistant_file");
FileUtils.createParentDirectories(nested);
assertEquals(filestore, FileUtils.getFileStore(nested));
}
}

Wyświetl plik

@ -1,45 +0,0 @@
package com.onthegomap.planetiler.util;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.io.IOException;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Locale;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
public class MmapUtilTest {
@Test
public void testMadviseAndUnmap(@TempDir Path dir) throws IOException {
String osName = System.getProperty("os.name", "").toLowerCase(Locale.ROOT);
String data = "test";
int bytes = data.getBytes(StandardCharsets.UTF_8).length;
var path = dir.resolve("file");
Files.writeString(path, data, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
try (FileChannel channel = FileChannel.open(path, StandardOpenOption.READ)) {
MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_ONLY, 0, bytes);
try {
MmapUtil.madvise(buffer, MmapUtil.Madvice.RANDOM);
byte[] received = new byte[bytes];
buffer.get(received);
assertEquals(data, new String(received, StandardCharsets.UTF_8));
} catch (IOException e) {
if (osName.startsWith("mac") || osName.startsWith("linux")) {
throw e;
} else {
System.out.println("madvise failed, but the system may not support it");
}
} finally {
MmapUtil.unmap(buffer);
}
} finally {
Files.delete(path);
}
}
}

Wyświetl plik

@ -1,6 +1,7 @@
package com.onthegomap.planetiler.util;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.DynamicTest.dynamicTest;
import java.util.Map;
@ -9,6 +10,7 @@ import org.junit.jupiter.api.DynamicTest;
import org.junit.jupiter.api.TestFactory;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
public class ParseTest {
@ -78,4 +80,23 @@ public class ParseTest {
).map(entry -> dynamicTest(entry.getKey().toString(),
() -> assertEquals(entry.getValue(), Parse.wayzorder(entry.getKey()))));
}
@ParameterizedTest
@CsvSource(value = {
"0, 0",
"1, 1",
"999999999999, 999999999999",
"2k, 2048",
"4M, 4194304",
"8G, 8589934592"
})
public void testParseJvmSize(String input, long expectedOutput) {
assertEquals(expectedOutput, Parse.jvmMemoryStringToBytes(input));
}
@ParameterizedTest
@ValueSource(strings = {"123p", "123gk", "garbage"})
public void testParseInvalidJvmSize(String input) {
assertThrows(IllegalArgumentException.class, () -> Parse.jvmMemoryStringToBytes(input));
}
}

Wyświetl plik

@ -0,0 +1,37 @@
package com.onthegomap.planetiler.util;
import static org.junit.jupiter.api.Assertions.assertThrows;
import java.util.OptionalLong;
import org.junit.jupiter.api.Test;
public class ResourceUsageTest {
@Test
public void testEmpty() {
new ResourceUsage("testing it out").checkAgainstLimits(true, false);
}
@Test
public void testFakeResource() {
var resource = new ResourceUsage.Global("testing resource", "get more", () -> OptionalLong.of(10L));
var check = new ResourceUsage("testing it out")
.add(resource, 9, "description");
check.checkAgainstLimits(false, false);
check.add(resource, 2, "more");
assertThrows(IllegalStateException.class, () -> check.checkAgainstLimits(false, false));
check.checkAgainstLimits(true, false);
}
@Test
public void testTooMuchRam() {
var check = new ResourceUsage("testing it out")
.addMemory(Runtime.getRuntime().maxMemory() - 1, "test");
check.checkAgainstLimits(false, false);
check.addMemory(2, "more");
assertThrows(IllegalStateException.class, () -> check.checkAgainstLimits(false, false));
check.checkAgainstLimits(true, false);
}
}

Wyświetl plik

@ -0,0 +1,31 @@
package com.onthegomap.planetiler.util;
import static org.junit.jupiter.api.Assertions.assertFalse;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
public class SlidingWindowTests {
@Test
@Timeout(10)
public void testSlidingWindow() throws InterruptedException {
var slidingWindow = new SlidingWindow(5);
var latch1 = new CountDownLatch(1);
var latch2 = new CountDownLatch(1);
Thread t1 = new Thread(() -> {
latch1.countDown();
slidingWindow.waitUntilInsideWindow(9);
latch2.countDown();
});
t1.start();
latch1.await();
assertFalse(latch2.await(100, TimeUnit.MILLISECONDS));
slidingWindow.advanceTail(4);
assertFalse(latch2.await(100, TimeUnit.MILLISECONDS));
slidingWindow.advanceTail(5);
latch2.await();
t1.join();
}
}

Wyświetl plik

@ -3,6 +3,7 @@ package com.onthegomap.planetiler.worker;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import com.onthegomap.planetiler.ExpectedException;
import com.onthegomap.planetiler.stats.ProgressLoggers;
import com.onthegomap.planetiler.stats.Stats;
import java.time.Duration;
@ -91,7 +92,6 @@ public class WorkerPipelineTest {
@Timeout(10)
@ValueSource(ints = {1, 2, 3})
public void testThrowingExceptionInPipelineHandledGracefully(int failureStage) {
class ExpectedException extends RuntimeException {}
var pipeline = WorkerPipeline.start("test", stats)
.<Integer>fromGenerator("reader", (next) -> {
if (failureStage == 1) {

Wyświetl plik

@ -2,6 +2,7 @@ package com.onthegomap.planetiler.worker;
import static org.junit.jupiter.api.Assertions.assertThrows;
import com.onthegomap.planetiler.ExpectedException;
import com.onthegomap.planetiler.stats.Stats;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.Test;
@ -15,7 +16,7 @@ public class WorkerTest {
AtomicInteger counter = new AtomicInteger(0);
var worker = new Worker("prefix", Stats.inMemory(), 4, () -> {
if (counter.incrementAndGet() == 1) {
throw new Error();
throw new ExpectedException();
} else {
Thread.sleep(5000);
}