Offload multipolygon storage to disk (reduce memory usage by 10-15gb) (#141)

pull/147/head
Michael Barry 2022-03-22 20:34:54 -04:00 zatwierdzone przez GitHub
rodzic 4c6d5e6aa7
commit e20b41c88d
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 4AEE18F83AFDEB23
15 zmienionych plików z 370 dodań i 130 usunięć

Wyświetl plik

@ -195,8 +195,8 @@ public class BasemapProfile extends ForwardingProfile {
@Override
public long estimateRamRequired(long osmFileSize) {
// 30gb for a 60gb OSM file is generally safe, although less might be OK too
return osmFileSize / 2;
// 20gb for a 67gb OSM file is safe, although less might be OK too
return osmFileSize * 20 / 67;
}
/**

Wyświetl plik

@ -2,6 +2,7 @@ package com.onthegomap.planetiler.benchmarks;
import com.onthegomap.planetiler.Profile;
import com.onthegomap.planetiler.collection.LongLongMap;
import com.onthegomap.planetiler.collection.LongLongMultimap;
import com.onthegomap.planetiler.config.Arguments;
import com.onthegomap.planetiler.config.PlanetilerConfig;
import com.onthegomap.planetiler.reader.osm.OsmInputFile;
@ -23,7 +24,8 @@ public class BenchmarkOsmRead {
Timer timer = Timer.start();
try (
var nodes = LongLongMap.noop();
var reader = new OsmReader("osm", file, nodes, profile, stats)
var multipolygons = LongLongMultimap.noop();
var reader = new OsmReader("osm", file, nodes, multipolygons, profile, stats)
) {
reader.pass1(config);
}

Wyświetl plik

@ -2,6 +2,7 @@ package com.onthegomap.planetiler;
import com.onthegomap.planetiler.collection.FeatureGroup;
import com.onthegomap.planetiler.collection.LongLongMap;
import com.onthegomap.planetiler.collection.LongLongMultimap;
import com.onthegomap.planetiler.config.Arguments;
import com.onthegomap.planetiler.config.MbtilesMetadata;
import com.onthegomap.planetiler.config.PlanetilerConfig;
@ -76,6 +77,7 @@ public class Planetiler {
private boolean overwrite = false;
private boolean ran = false;
private Path nodeDbPath;
private Path multipolygonPath;
// most common OSM languages
private List<String> languages = List.of(
"en", "ru", "ar", "zh", "ja", "ko", "fr",
@ -160,7 +162,7 @@ public class Planetiler {
var thisInputFile = new OsmInputFile(path, config.osmLazyReads());
osmInputFile = thisInputFile;
// fail fast if there is some issue with madvise on this system
if (config.nodeMapMadvise()) {
if (config.nodeMapMadvise() || config.multipolygonGeometryMadvise()) {
ByteBufferUtil.init();
}
return appendStage(new Stage(
@ -173,12 +175,15 @@ public class Planetiler {
try (
var nodeLocations =
LongLongMap.from(config.nodeMapType(), config.nodeMapStorage(), nodeDbPath, config.nodeMapMadvise());
var osmReader = new OsmReader(name, thisInputFile, nodeLocations, profile(), stats)
var multipolygonGeometries = LongLongMultimap.newReplaceableMultimap(
config.multipolygonGeometryStorage(), multipolygonPath, config.multipolygonGeometryMadvise());
var osmReader = new OsmReader(name, thisInputFile, nodeLocations, multipolygonGeometries, profile(), stats)
) {
osmReader.pass1(config);
osmReader.pass2(featureGroup, config);
} finally {
FileUtils.delete(nodeDbPath);
FileUtils.delete(multipolygonPath);
}
}))
);
@ -507,6 +512,7 @@ public class Planetiler {
Files.createDirectories(tmpDir);
nodeDbPath = tmpDir.resolve("node.db");
multipolygonPath = tmpDir.resolve("multipolygon.db");
Path featureDbPath = tmpDir.resolve("feature.db");
featureGroup = FeatureGroup.newDiskBackedFeatureGroup(featureDbPath, profile, config, stats);
stats.monitorFile("nodes", nodeDbPath);
@ -541,12 +547,15 @@ public class Planetiler {
ResourceUsage writePhase = new ResourceUsage("write phase disk");
long osmSize = osmInputFile.diskUsageBytes();
long nodeMapSize =
LongLongMap.estimateStorageRequired(config.nodeMapType(), config.nodeMapStorage(), osmSize, tmpDir).diskUsage();
OsmReader.estimateNodeLocationUsage(config.nodeMapType(), config.nodeMapStorage(), osmSize, tmpDir).diskUsage();
long multipolygonGeometrySize =
OsmReader.estimateMultipolygonGeometryUsage(config.multipolygonGeometryStorage(), osmSize, tmpDir).diskUsage();
long featureSize = profile.estimateIntermediateDiskBytes(osmSize);
long outputSize = profile.estimateOutputBytes(osmSize);
// node locations only needed while reading inputs
// node locations and multipolygon geometries only needed while reading inputs
readPhase.addDisk(tmpDir, nodeMapSize, "temporary node location cache");
readPhase.addDisk(tmpDir, multipolygonGeometrySize, "temporary multipolygon geometry cache");
// feature db persists across read/write phase
readPhase.addDisk(tmpDir, featureSize, "temporary feature storage");
writePhase.addDisk(tmpDir, featureSize, "temporary feature storage");
@ -566,25 +575,37 @@ public class Planetiler {
private void checkMemory() {
Format format = Format.defaultInstance();
ResourceUsage check = new ResourceUsage("read phase");
ResourceUsage nodeMapUsages = LongLongMap.estimateStorageRequired(config.nodeMapType(), config.nodeMapStorage(),
ResourceUsage nodeMapUsages = OsmReader.estimateNodeLocationUsage(config.nodeMapType(), config.nodeMapStorage(),
osmInputFile.diskUsageBytes(), tmpDir);
long nodeMapDiskUsage = nodeMapUsages.diskUsage();
ResourceUsage multipolygonGeometryUsages =
OsmReader.estimateMultipolygonGeometryUsage(config.nodeMapStorage(), osmInputFile.diskUsageBytes(), tmpDir);
long memoryMappedFiles = nodeMapUsages.diskUsage() + multipolygonGeometryUsages.diskUsage();
check.addAll(nodeMapUsages)
check
.addAll(nodeMapUsages)
.addAll(multipolygonGeometryUsages)
.addMemory(profile().estimateRamRequired(osmInputFile.diskUsageBytes()), "temporary profile storage");
check.checkAgainstLimits(config().force(), true);
// check off-heap memory if we can get it
ProcessInfo.getSystemFreeMemoryBytes().ifPresent(extraMemory -> {
if (extraMemory < nodeMapDiskUsage) {
LOGGER.warn("""
Planetiler will use a ~%s memory-mapped file for node locations but the OS only has %s available
to cache pages, this may slow the import down. To speed up, run on a machine with more memory or
reduce the -Xmx setting.
""".formatted(
format.storage(nodeMapDiskUsage),
if (extraMemory < memoryMappedFiles) {
LOGGER.warn(
"""
Planetiler will use ~%s memory-mapped files for node locations and multipolygon geometries but the OS only
has %s available to cache pages, this may slow the import down. To speed up, run on a machine with more
memory or reduce the -Xmx setting.
"""
.formatted(
format.storage(memoryMappedFiles),
format.storage(extraMemory)
));
} else {
LOGGER.debug("✓ %s temporary files and %s of free memory for OS to cache them".formatted(
format.storage(memoryMappedFiles),
format.storage(extraMemory)
));
}
});

Wyświetl plik

@ -2,7 +2,6 @@ package com.onthegomap.planetiler.collection;
import com.onthegomap.planetiler.util.DiskBacked;
import com.onthegomap.planetiler.util.MemoryEstimator;
import com.onthegomap.planetiler.util.ResourceUsage;
import java.io.Closeable;
import java.nio.file.Path;
@ -64,39 +63,6 @@ public interface LongLongMap extends Closeable, MemoryEstimator.HasEstimate, Dis
return from(Type.SORTED_TABLE, Storage.RAM, new Storage.Params(Path.of("."), false));
}
/** Estimates the resource requirements for this nodemap for a given OSM input file. */
static ResourceUsage estimateStorageRequired(String name, String storage, long osmFileSize, Path path) {
return estimateStorageRequired(Type.from(name), Storage.from(storage), osmFileSize, path);
}
/** Estimates the resource requirements for this nodemap for a given OSM input file. */
static ResourceUsage estimateStorageRequired(Type type, Storage storage, long osmFileSize, Path path) {
long nodes = estimateNumNodes(osmFileSize);
long maxNodeId = estimateMaxNodeId(osmFileSize);
ResourceUsage check = new ResourceUsage("long long map");
return switch (type) {
case NOOP -> check;
case SPARSE_ARRAY -> check.addMemory(300_000_000L, "sparsearray node location in-memory index")
.add(path, storage, 9 * nodes, "sparsearray node location cache");
case SORTED_TABLE -> check.addMemory(300_000_000L, "sortedtable node location in-memory index")
.add(path, storage, 12 * nodes, "sortedtable node location cache");
case ARRAY -> check.add(path, storage, 8 * maxNodeId,
"array node location cache (switch to sparsearray to reduce size)");
};
}
private static long estimateNumNodes(long osmFileSize) {
// On 2/14/2022, planet.pbf was 66691979646 bytes with ~7.5b nodes, so scale from there
return Math.round(7_500_000_000d * (osmFileSize / 66_691_979_646d));
}
private static long estimateMaxNodeId(long osmFileSize) {
// On 2/14/2022, planet.pbf was 66691979646 bytes and max node ID was ~9.5b, so scale from there
// but don't go less than 9.5b in case it's an extract
return Math.round(9_500_000_000d * Math.max(1, osmFileSize / 66_691_979_646d));
}
/** Returns a longlong map that stores no data and throws on read */
static LongLongMap noop() {
return new ParallelWrites() {

Wyświetl plik

@ -5,21 +5,55 @@ import static com.onthegomap.planetiler.util.MemoryEstimator.estimateSize;
import com.carrotsearch.hppc.LongArrayList;
import com.carrotsearch.hppc.LongIntHashMap;
import com.onthegomap.planetiler.stats.Timer;
import com.onthegomap.planetiler.util.DiskBacked;
import com.onthegomap.planetiler.util.MemoryEstimator;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Path;
import java.util.Arrays;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* An in-memory map that stores a multiple {@code long} values for each {@code long} key.
* <p>
* Implementations extend {@link Replaceable} if they support replacing the previous set of values for a key and/or
* {@link Appendable} if they support adding new values for a key.
*/
// TODO: The two implementations should probably not implement the same interface
public interface LongLongMultimap extends MemoryEstimator.HasEstimate {
public interface LongLongMultimap extends MemoryEstimator.HasEstimate, DiskBacked, AutoCloseable {
/** Returns a {@link Noop} implementation that does nothin on put and throws an exception if you try to get. */
static Noop noop() {
return new Noop();
}
/** Returns a new multimap where each write sets the list of values for a key, and that order is preserved on read. */
static Replaceable newReplaceableMultimap(Storage storage, Storage.Params params) {
return new DenseOrderedMultimap(storage, params);
}
/** Returns a new replaceable multimap held in-memory. */
static Replaceable newInMemoryReplaceableMultimap() {
return newReplaceableMultimap(Storage.RAM, null);
}
/** Returns a new multimap where each write adds a value for the given key. */
static Appendable newAppendableMultimap() {
return new SparseUnorderedBinarySearchMultimap();
}
/**
* Writes the value for a key. Not thread safe!
* Returns a new longlong multimap from config strings.
*
* @param storage name of the {@link Storage} implementation to use
* @param path where to store data (if mmap)
* @param madvise whether to use linux madvise random to improve read performance
* @return A longlong map instance
* @throws IllegalArgumentException if {@code name} or {@code storage} is not valid
*/
void put(long key, long value);
static Replaceable newReplaceableMultimap(String storage, Path path, boolean madvise) {
return newReplaceableMultimap(Storage.from(storage), new Storage.Params(path, madvise));
}
/**
* Returns the values for a key. Safe to be called by multiple threads after all values have been written. After the
@ -27,27 +61,70 @@ public interface LongLongMultimap extends MemoryEstimator.HasEstimate {
*/
LongArrayList get(long key);
default void putAll(long key, LongArrayList vals) {
for (int i = 0; i < vals.size(); i++) {
put(key, vals.get(i));
@Override
void close();
@Override
default long diskUsageBytes() {
return 0L;
}
/**
* A map from long to list of longs where you can use {@link #replaceValues(long, LongArrayList)} to set replace the
* previous list of values with a new one.
*/
interface Replaceable extends LongLongMultimap {
/** Replaces the previous list of values for {@code key} with {@code values}. */
void replaceValues(long key, LongArrayList values);
}
/**
* A map from long to list of longs where you can use {@link #put(long, long)} or {@link #putAll(long, LongArrayList)}
* to append values for a key.
*/
interface Appendable extends LongLongMultimap {
/**
* Writes the value for a key. Not thread safe!
*/
void put(long key, long value);
default void putAll(long key, LongArrayList vals) {
for (int i = 0; i < vals.size(); i++) {
put(key, vals.get(i));
}
}
}
/** Returns a new multimap where each write sets the list of values for a key, and that order is preserved on read. */
static LongLongMultimap newDensedOrderedMultimap() {
return new DenseOrderedHppcMultimap();
}
/** Dummy implementation of a map that throws an exception from {@link #get(long)}. */
class Noop implements Replaceable, Appendable {
/** Returns a new multimap where each write adds a value for the given key. */
static LongLongMultimap newSparseUnorderedMultimap() {
return new SparseUnorderedBinarySearchMultimap();
@Override
public void put(long key, long value) {}
@Override
public LongArrayList get(long key) {
throw new UnsupportedOperationException("get(key) not implemented");
}
@Override
public long estimateMemoryUsageBytes() {
return 0;
}
@Override
public void close() {}
@Override
public void replaceValues(long key, LongArrayList values) {}
}
/**
* A map from {@code long} to {@code long} stored as a list of keys and values that uses binary search to find the
* values for a key. Inserts do not need to be ordered, the first read will sort the array.
*/
class SparseUnorderedBinarySearchMultimap implements LongLongMultimap {
class SparseUnorderedBinarySearchMultimap implements Appendable {
private static final Logger LOGGER = LoggerFactory.getLogger(SparseUnorderedBinarySearchMultimap.class);
@ -143,32 +220,43 @@ public interface LongLongMultimap extends MemoryEstimator.HasEstimate {
public long estimateMemoryUsageBytes() {
return estimateSize(keys) + estimateSize(values);
}
@Override
public void close() {
keys.release();
values.release();
}
}
/**
* A map from {@code long} to {@code long} where each putAll replaces previous values and results are returned in the
* same order they were inserted.
*/
class DenseOrderedHppcMultimap implements LongLongMultimap {
class DenseOrderedMultimap implements Replaceable {
private static final LongArrayList EMPTY_LIST = new LongArrayList();
private final LongIntHashMap keyToValuesIndex = Hppc.newLongIntHashMap();
// each block starts with a "length" header then contains that number of entries
private final LongArrayList values = new LongArrayList();
private final AppendStore.Longs values;
@Override
public void putAll(long key, LongArrayList others) {
if (others.isEmpty()) {
return;
}
keyToValuesIndex.put(key, values.size());
values.add(others.size());
values.add(others.buffer, 0, others.size());
public DenseOrderedMultimap(Storage storage, Storage.Params params) {
values = switch (storage) {
case MMAP -> new AppendStoreMmap.Longs(params);
case RAM -> new AppendStoreRam.Longs(false);
case DIRECT -> new AppendStoreRam.Longs(true);
};
}
@Override
public void put(long key, long val) {
putAll(key, LongArrayList.from(val));
public void replaceValues(long key, LongArrayList values) {
if (values.isEmpty()) {
return;
}
keyToValuesIndex.put(key, (int) this.values.size());
this.values.appendLong(values.size());
for (int i = 0; i < values.size(); i++) {
this.values.appendLong(values.get(i));
}
}
@Override
@ -176,8 +264,10 @@ public interface LongLongMultimap extends MemoryEstimator.HasEstimate {
int index = keyToValuesIndex.getOrDefault(key, -1);
if (index >= 0) {
LongArrayList result = new LongArrayList();
int num = (int) values.get(index);
result.add(values.buffer, index + 1, num);
int num = (int) values.getLong(index);
for (int i = 0; i < num; i++) {
result.add(values.getLong(i + index + 1));
}
return result;
} else {
return EMPTY_LIST;
@ -188,5 +278,20 @@ public interface LongLongMultimap extends MemoryEstimator.HasEstimate {
public long estimateMemoryUsageBytes() {
return estimateSize(keyToValuesIndex) + estimateSize(values);
}
@Override
public long diskUsageBytes() {
return values.diskUsageBytes();
}
@Override
public void close() {
keyToValuesIndex.release();
try {
values.close();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}
}

Wyświetl plik

@ -155,12 +155,12 @@ public class Arguments {
});
}
private String getArg(String key) {
String getArg(String key) {
String value = get(key);
return value == null ? null : value.trim();
}
private String getArg(String key, String defaultValue) {
String getArg(String key, String defaultValue) {
String value = getArg(key);
return value == null ? defaultValue : value;
}

Wyświetl plik

@ -25,6 +25,8 @@ public record PlanetilerConfig(
String nodeMapType,
String nodeMapStorage,
boolean nodeMapMadvise,
String multipolygonGeometryStorage,
boolean multipolygonGeometryMadvise,
String httpUserAgent,
Duration httpTimeout,
int httpRetries,
@ -60,6 +62,18 @@ public record PlanetilerConfig(
}
public static PlanetilerConfig from(Arguments arguments) {
// use --madvise and --storage options as default for temp storage, but allow users to override them explicitly for
// multipolygon geometries or node locations
boolean defaultMadvise =
arguments.getBoolean("madvise",
"default value for whether to use linux madvise(random) to improve memory-mapped read performance for temporary storage",
true);
// nodemap_storage was previously the only option, so if that's set make it the default
String fallbackTempStorage = arguments.getArg("nodemap_storage", Storage.MMAP.id());
String defaultTempStorage = arguments.getString("storage",
"default storage type for temporary data, one of " + Stream.of(Storage.values()).map(
Storage::id).toList(),
fallbackTempStorage);
return new PlanetilerConfig(
arguments,
new Bounds(arguments.bounds("bounds", "bounds")),
@ -79,9 +93,15 @@ public record PlanetilerConfig(
arguments
.getString("nodemap_type", "type of node location map, one of " + Stream.of(LongLongMap.Type.values()).map(
t -> t.id()).toList(), LongLongMap.Type.SPARSE_ARRAY.id()),
arguments.getString("nodemap_storage", "storage for location map, one of " + Stream.of(Storage.values()).map(
Storage::id).toList(), Storage.MMAP.id()),
arguments.getBoolean("nodemap_madvise", "use linux madvise(random) to improve memory-mapped read performance",
arguments.getString("nodemap_storage", "storage for node location map, one of " + Stream.of(Storage.values()).map(
Storage::id).toList(), defaultTempStorage),
arguments.getBoolean("nodemap_madvise", "use linux madvise(random) for node locations", defaultMadvise),
arguments.getString("multipolygon_geometry_storage",
"storage for multipolygon geometries, one of " + Stream.of(Storage.values()).map(
Storage::id).toList(),
defaultTempStorage),
arguments.getBoolean("multipolygon_geometry_madvise",
"use linux madvise(random) for temporary multipolygon geometry storage",
false),
arguments.getString("http_user_agent", "User-Agent header to set when downloading files over HTTP",
"Planetiler downloader (https://github.com/onthegomap/planetiler)"),

Wyświetl plik

@ -15,6 +15,7 @@ import com.onthegomap.planetiler.collection.Hppc;
import com.onthegomap.planetiler.collection.LongLongMap;
import com.onthegomap.planetiler.collection.LongLongMultimap;
import com.onthegomap.planetiler.collection.SortableFeature;
import com.onthegomap.planetiler.collection.Storage;
import com.onthegomap.planetiler.config.PlanetilerConfig;
import com.onthegomap.planetiler.geo.GeoUtils;
import com.onthegomap.planetiler.geo.GeometryException;
@ -26,12 +27,14 @@ import com.onthegomap.planetiler.stats.ProgressLoggers;
import com.onthegomap.planetiler.stats.Stats;
import com.onthegomap.planetiler.util.Format;
import com.onthegomap.planetiler.util.MemoryEstimator;
import com.onthegomap.planetiler.util.ResourceUsage;
import com.onthegomap.planetiler.worker.Distributor;
import com.onthegomap.planetiler.worker.WeightedHandoffQueue;
import com.onthegomap.planetiler.worker.WorkQueue;
import com.onthegomap.planetiler.worker.WorkerPipeline;
import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@ -79,14 +82,14 @@ public class OsmReader implements Closeable, MemoryEstimator.HasEstimate {
// <~500mb
private LongObjectHashMap<OsmRelationInfo> relationInfo = Hppc.newLongObjectHashMap();
// ~800mb, ~1.6GB when sorting
private LongLongMultimap wayToRelations = LongLongMultimap.newSparseUnorderedMultimap();
private LongLongMultimap.Appendable wayToRelations = LongLongMultimap.newAppendableMultimap();
private final Object wayToRelationsLock = new Object();
// for multipolygons need to store way info (20m ways, 800m nodes) to use when processing relations (4.5m)
// ~300mb
private LongHashSet waysInMultipolygon = new LongHashSet();
private final Object waysInMultipolygonLock = new Object();
// ~7GB
private LongLongMultimap multipolygonWayGeometries = LongLongMultimap.newDensedOrderedMultimap();
private LongLongMultimap.Replaceable multipolygonWayGeometries;
// keep track of data needed to encode/decode role strings into a long
private final ObjectIntHashMap<String> roleIds = new ObjectIntHashMap<>();
private final IntObjectHashMap<String> roleIdsReverse = new IntObjectHashMap<>();
@ -97,15 +100,16 @@ public class OsmReader implements Closeable, MemoryEstimator.HasEstimate {
* Constructs a new {@code OsmReader} from an {@code osmSourceProvider} that will use {@code nodeLocationDb} as a
* temporary store for node locations.
*
* @param name ID for this reader to use in stats and logs
* @param osmSourceProvider the file to read raw nodes, ways, and relations from
* @param nodeLocationDb store that will temporarily hold node locations (encoded as a long) between passes to
* reconstruct way geometries
* @param profile logic that defines what map features to emit for each source feature
* @param stats to keep track of counters and timings
* @param name ID for this reader to use in stats and logs
* @param osmSourceProvider the file to read raw nodes, ways, and relations from
* @param nodeLocationDb store that will temporarily hold node locations (encoded as a long) between passes to
* reconstruct way geometries
* @param multipolygonGeometries store that will temporarily hold multipolygon way geometries
* @param profile logic that defines what map features to emit for each source feature
* @param stats to keep track of counters and timings
*/
public OsmReader(String name, Supplier<OsmBlockSource> osmSourceProvider, LongLongMap nodeLocationDb, Profile profile,
Stats stats) {
public OsmReader(String name, Supplier<OsmBlockSource> osmSourceProvider, LongLongMap nodeLocationDb,
LongLongMultimap.Replaceable multipolygonGeometries, Profile profile, Stats stats) {
this.name = name;
this.osmBlockSource = osmSourceProvider.get();
this.nodeLocationDb = nodeLocationDb;
@ -118,6 +122,16 @@ public class OsmReader implements Closeable, MemoryEstimator.HasEstimate {
"ways", pass1Phaser::ways,
"relations", pass1Phaser::relations
));
this.multipolygonWayGeometries = multipolygonGeometries;
}
/**
* Alias for {@link #OsmReader(String, Supplier, LongLongMap, LongLongMultimap.Replaceable, Profile, Stats)} that sets
* the multipolygon geometry multimap to a default in-memory implementation.
*/
public OsmReader(String name, Supplier<OsmBlockSource> osmSourceProvider, LongLongMap nodeLocationDb, Profile profile,
Stats stats) {
this(name, osmSourceProvider, nodeLocationDb, LongLongMultimap.newInMemoryReplaceableMultimap(), profile, stats);
}
/**
@ -308,7 +322,7 @@ public class OsmReader implements Closeable, MemoryEstimator.HasEstimate {
var pipeline = WorkerPipeline.start("osm_pass2", stats)
.fromGenerator("read", osmBlockSource::forEachBlock)
.addBuffer("pbf_blocks", 100)
.addBuffer("pbf_blocks", Math.max(10, threads / 2))
.<SortableFeature>addWorker("process", processThreads, (prev, next) -> {
// avoid contention trying to get the thread-local counters by getting them once when thread starts
Counter blocks = blocksProcessed.counterForThread();
@ -366,7 +380,8 @@ public class OsmReader implements Closeable, MemoryEstimator.HasEstimate {
.addRatePercentCounter("blocks", PASS1_BLOCKS.get(), blocksProcessed, false)
.newLine()
.addProcessStats()
.addInMemoryObject("hppc", this)
.addInMemoryObject("relInfo", this)
.addFileSizeAndRam("mpGeoms", multipolygonWayGeometries)
.newLine()
.addPipelineStats(pipeline);
@ -386,6 +401,57 @@ public class OsmReader implements Closeable, MemoryEstimator.HasEstimate {
}
}
/** Estimates the resource requirements for a nodemap but parses the type/storage from strings. */
public static ResourceUsage estimateNodeLocationUsage(String type, String storage, long osmFileSize, Path path) {
return estimateNodeLocationUsage(LongLongMap.Type.from(type), Storage.from(storage), osmFileSize, path);
}
/** Estimates the resource requirements for a nodemap for a given OSM input file. */
public static ResourceUsage estimateNodeLocationUsage(LongLongMap.Type type, Storage storage, long osmFileSize,
Path path) {
long nodes = estimateNumNodes(osmFileSize);
long maxNodeId = estimateMaxNodeId(osmFileSize);
ResourceUsage check = new ResourceUsage("nodemap");
return switch (type) {
case NOOP -> check;
case SPARSE_ARRAY -> check.addMemory(300_000_000L, "sparsearray node location in-memory index")
.add(path, storage, 9 * nodes, "sparsearray node location cache");
case SORTED_TABLE -> check.addMemory(300_000_000L, "sortedtable node location in-memory index")
.add(path, storage, 12 * nodes, "sortedtable node location cache");
case ARRAY -> check.add(path, storage, 8 * maxNodeId,
"array node location cache (switch to sparsearray to reduce size)");
};
}
/**
* Estimates the resource requirements for a multipolygon geometry multimap but parses the type/storage from strings.
*/
public static ResourceUsage estimateMultipolygonGeometryUsage(String storage, long osmFileSize, Path path) {
return estimateMultipolygonGeometryUsage(Storage.from(storage), osmFileSize, path);
}
/** Estimates the resource requirements for a multipolygon geometry multimap for a given OSM input file. */
public static ResourceUsage estimateMultipolygonGeometryUsage(Storage storage, long osmFileSize, Path path) {
// Massachusetts extract (260MB) requires about 20MB for way geometries
long estimatedSize = 20_000_000L * osmFileSize / 260_000_000L;
return new ResourceUsage("way geometry multipolygon")
.add(path, storage, estimatedSize, "multipolygon way geometries");
}
private static long estimateNumNodes(long osmFileSize) {
// On 2/14/2022, planet.pbf was 66691979646 bytes with ~7.5b nodes, so scale from there
return Math.round(7_500_000_000d * (osmFileSize / 66_691_979_646d));
}
private static long estimateMaxNodeId(long osmFileSize) {
// On 2/14/2022, planet.pbf was 66691979646 bytes and max node ID was ~9.5b, so scale from there
// but don't go less than 9.5b in case it's an extract
return Math.round(9_500_000_000d * Math.max(1, osmFileSize / 66_691_979_646d));
}
private void render(FeatureCollector.Factory featureCollectors, FeatureRenderer renderer, OsmElement element,
SourceFeature feature) {
FeatureCollector features = featureCollectors.get(feature);
@ -423,7 +489,7 @@ public class OsmReader implements Closeable, MemoryEstimator.HasEstimate {
// if this is part of a multipolygon, store the node IDs for this way ID so that when
// we get to the multipolygon we can go from way IDs -> node IDs -> node locations.
synchronized (this) { // multiple threads may update this concurrently
multipolygonWayGeometries.putAll(way.id(), nodes);
multipolygonWayGeometries.replaceValues(way.id(), nodes);
}
}
boolean closed = nodes.size() > 1 && nodes.get(0) == nodes.get(nodes.size() - 1);
@ -466,7 +532,7 @@ public class OsmReader implements Closeable, MemoryEstimator.HasEstimate {
public long estimateMemoryUsageBytes() {
long size = 0;
size += estimateSize(waysInMultipolygon);
size += estimateSize(multipolygonWayGeometries);
// multipolygonWayGeometries is reported separately
size += estimateSize(wayToRelations);
size += estimateSize(relationInfo);
size += estimateSize(roleIdsReverse);
@ -478,7 +544,10 @@ public class OsmReader implements Closeable, MemoryEstimator.HasEstimate {
@Override
public void close() throws IOException {
multipolygonWayGeometries = null;
if (multipolygonWayGeometries != null) {
multipolygonWayGeometries.close();
multipolygonWayGeometries = null;
}
wayToRelations = null;
waysInMultipolygon = null;
relationInfo = null;

Wyświetl plik

@ -227,11 +227,16 @@ public class ProgressLoggers {
return add(() -> " " + padRight(format.storage(longSupplier.diskUsageBytes(), false), 5));
}
/** Adds the total of disk and memory usage of {@code thing}. */
/** Adds the name and total of disk and memory usage of {@code thing}. */
public <T extends DiskBacked & MemoryEstimator.HasEstimate> ProgressLoggers addFileSizeAndRam(T thing) {
return addFileSizeAndRam("", thing);
}
/** Adds the total of disk and memory usage of {@code thing}. */
public <T extends DiskBacked & MemoryEstimator.HasEstimate> ProgressLoggers addFileSizeAndRam(String name, T thing) {
return add(() -> {
long bytes = thing.diskUsageBytes() + thing.estimateMemoryUsageBytes();
return " " + padRight(format.storage(bytes, false), 5);
return " " + name + (name.isBlank() ? "" : ": ") + padRight(format.storage(bytes, false), 5);
});
}

Wyświetl plik

@ -13,6 +13,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -25,6 +26,7 @@ public class Worker {
private static final Logger LOGGER = LoggerFactory.getLogger(Worker.class);
private final String prefix;
private final CompletableFuture<?> done;
private static final AtomicBoolean firstWorkerDied = new AtomicBoolean(false);
/**
* Constructs a new worker and immediately starts {@code threads} thread all running {@code task}.
@ -51,7 +53,10 @@ public class Worker {
stats.timers().finishedWorker(prefix, Duration.ofNanos(System.nanoTime() - start));
} catch (Throwable e) {
System.err.println("Worker " + id + " died");
e.printStackTrace();
// when one worker dies it may close resources causing others to die as well, so only log the first
if (firstWorkerDied.compareAndSet(false, true)) {
e.printStackTrace();
}
throwRuntimeException(e);
} finally {
LOGGER.trace("Finished worker");

Wyświetl plik

@ -5,6 +5,7 @@ import static org.junit.jupiter.api.Assertions.*;
import com.onthegomap.planetiler.collection.FeatureGroup;
import com.onthegomap.planetiler.collection.LongLongMap;
import com.onthegomap.planetiler.collection.LongLongMultimap;
import com.onthegomap.planetiler.config.Arguments;
import com.onthegomap.planetiler.config.MbtilesMetadata;
import com.onthegomap.planetiler.config.PlanetilerConfig;
@ -104,7 +105,8 @@ public class PlanetilerTests {
next.accept(OsmBlockSource.Block.of(osmElements.stream().filter(e -> e instanceof OsmElement.Relation).toList()));
};
var nodeMap = LongLongMap.newInMemorySortedTable();
try (var reader = new OsmReader("osm", () -> elems, nodeMap, profile, Stats.inMemory())) {
var multipolygons = LongLongMultimap.newInMemoryReplaceableMultimap();
try (var reader = new OsmReader("osm", () -> elems, nodeMap, multipolygons, profile, Stats.inMemory())) {
reader.pass1(config);
reader.pass2(featureGroup, config);
}
@ -777,7 +779,8 @@ public class PlanetilerTests {
OsmBlockSource.Block.of(osmElements.stream().filter(e -> e instanceof OsmElement.Relation).toList()));
};
var nodeMap = LongLongMap.newInMemorySortedTable();
try (var reader = new OsmReader("osm", () -> elems, nodeMap, profile, Stats.inMemory())) {
var multipolygons = LongLongMultimap.newInMemoryReplaceableMultimap();
try (var reader = new OsmReader("osm", () -> elems, nodeMap, multipolygons, profile, Stats.inMemory())) {
// skip pass 1
reader.pass2(featureGroup, config);
}

Wyświetl plik

@ -4,6 +4,7 @@ import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import com.onthegomap.planetiler.reader.osm.OsmReader;
import com.onthegomap.planetiler.util.Format;
import com.onthegomap.planetiler.util.ResourceUsage;
import java.nio.file.Path;
@ -127,7 +128,7 @@ public abstract class LongLongMapTest {
for (LongLongMap.Type type : LongLongMap.Type.values()) {
var variant = storage + "-" + type;
var params = new Storage.Params(path.resolve(variant), true);
var estimatedSize = LongLongMap.estimateStorageRequired(type, storage, 70_000_000_000L, params.path());
var estimatedSize = OsmReader.estimateNodeLocationUsage(type, storage, 70_000_000_000L, params.path());
var usage = storage == Storage.MMAP ? estimatedSize.diskUsage() :
estimatedSize.get(
storage == Storage.DIRECT ? ResourceUsage.DIRECT_MEMORY : ResourceUsage.HEAP

Wyświetl plik

@ -4,9 +4,12 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import com.carrotsearch.hppc.LongArrayList;
import java.nio.file.Path;
import java.util.Arrays;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
public abstract class LongLongMultimapTest {
@ -20,16 +23,36 @@ public abstract class LongLongMultimapTest {
@Test
public void oneValue() {
map.put(1, 1);
put(1, 1);
assertResultLists(LongArrayList.from(), map.get(0));
assertResultLists(LongArrayList.from(1), map.get(1));
assertResultLists(LongArrayList.from(), map.get(2));
}
private void put(int k, int v) {
if (map instanceof LongLongMultimap.Replaceable r) {
r.replaceValues(k, LongArrayList.from(v));
} else if (map instanceof LongLongMultimap.Appendable a) {
a.put(k, v);
} else {
throw new UnsupportedOperationException(map.getClass().getCanonicalName());
}
}
private void putAll(long k, LongArrayList vs) {
if (map instanceof LongLongMultimap.Replaceable r) {
r.replaceValues(k, vs);
} else if (map instanceof LongLongMultimap.Appendable a) {
a.putAll(k, vs);
} else {
throw new UnsupportedOperationException(map.getClass().getCanonicalName());
}
}
@Test
public void twoConsecutiveValues() {
map.put(1, 1);
map.put(2, 2);
put(1, 1);
put(2, 2);
assertResultLists(LongArrayList.from(), map.get(0));
assertResultLists(LongArrayList.from(1), map.get(1));
assertResultLists(LongArrayList.from(2), map.get(2));
@ -38,8 +61,8 @@ public abstract class LongLongMultimapTest {
@Test
public void twoNonconsecutiveValues() {
map.put(1, 1);
map.put(3, 3);
put(1, 1);
put(3, 3);
assertResultLists(LongArrayList.from(), map.get(0));
assertResultLists(LongArrayList.from(1), map.get(1));
assertResultLists(LongArrayList.from(), map.get(2));
@ -52,15 +75,15 @@ public abstract class LongLongMultimapTest {
if (retainInputOrder) {
return;
}
map.put(3, 31);
map.put(2, 21);
map.put(1, 11);
map.put(1, 12);
map.put(2, 22);
map.put(3, 32);
map.put(3, 33);
map.put(2, 23);
map.put(1, 13);
put(3, 31);
put(2, 21);
put(1, 11);
put(1, 12);
put(2, 22);
put(3, 32);
put(3, 33);
put(2, 23);
put(1, 13);
assertResultLists(LongArrayList.from(11, 12, 13), map.get(1));
assertResultLists(LongArrayList.from(21, 22, 23), map.get(2));
assertResultLists(LongArrayList.from(31, 32, 33), map.get(3));
@ -74,7 +97,7 @@ public abstract class LongLongMultimapTest {
for (int j = 0; j < 10; j++) {
toInsert[j] = i * 10 + j + 1;
}
map.putAll(i, LongArrayList.from(toInsert));
putAll(i, LongArrayList.from(toInsert));
}
for (int i = 0; i < 100; i++) {
assertResultLists(LongArrayList.from(
@ -107,7 +130,7 @@ public abstract class LongLongMultimapTest {
@Test
public void manyInsertsUnordered() {
for (long i = 99; i >= 0; i--) {
map.putAll(i, LongArrayList.from(
putAll(i, LongArrayList.from(
i * 10 + 10,
i * 10 + 9,
i * 10 + 8,
@ -138,8 +161,8 @@ public abstract class LongLongMultimapTest {
@Test
public void multiInsert() {
map.putAll(1, LongArrayList.from(1, 2, 3));
map.put(0, 3);
putAll(1, LongArrayList.from(1, 2, 3));
put(0, 3);
assertResultLists(LongArrayList.from(3), map.get(0));
assertResultLists(LongArrayList.from(1, 2, 3), map.get(1));
assertResultLists(LongArrayList.from(), map.get(2));
@ -149,7 +172,7 @@ public abstract class LongLongMultimapTest {
@BeforeEach
public void setup() {
this.map = LongLongMultimap.newSparseUnorderedMultimap();
this.map = LongLongMultimap.newAppendableMultimap();
}
}
@ -158,7 +181,23 @@ public abstract class LongLongMultimapTest {
@BeforeEach
public void setup() {
retainInputOrder = true;
this.map = LongLongMultimap.newDensedOrderedMultimap();
this.map =
LongLongMultimap.newInMemoryReplaceableMultimap();
}
}
public static class DenseOrderedMmapTest extends LongLongMultimapTest {
@BeforeEach
public void setup(@TempDir Path dir) {
retainInputOrder = true;
this.map =
LongLongMultimap.newReplaceableMultimap(Storage.MMAP, new Storage.Params(dir.resolve("multimap"), true));
}
@AfterEach
public void teardown() {
this.map.close();
}
}
}

Wyświetl plik

@ -9,6 +9,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import com.onthegomap.planetiler.Profile;
import com.onthegomap.planetiler.TestUtils;
import com.onthegomap.planetiler.collection.LongLongMap;
import com.onthegomap.planetiler.collection.LongLongMultimap;
import com.onthegomap.planetiler.geo.GeoUtils;
import com.onthegomap.planetiler.geo.GeometryException;
import com.onthegomap.planetiler.reader.SourceFeature;
@ -28,6 +29,7 @@ public class OsmReaderTest {
private final Stats stats = Stats.inMemory();
private final Profile profile = new Profile.NullProfile();
private final LongLongMap nodeMap = LongLongMap.newInMemorySortedTable();
private final LongLongMultimap.Replaceable multipolygons = LongLongMultimap.newInMemoryReplaceableMultimap();
private void processPass1Block(OsmReader reader, Iterable<OsmElement> block) {
reader.processPass1Blocks(List.of(block));
@ -608,7 +610,7 @@ public class OsmReaderTest {
public void testWayInRelation() {
record OtherRelInfo(long id) implements OsmRelationInfo {}
record TestRelInfo(long id, String name) implements OsmRelationInfo {}
OsmReader reader = new OsmReader("osm", () -> osmSource, nodeMap, new Profile.NullProfile() {
OsmReader reader = new OsmReader("osm", () -> osmSource, nodeMap, multipolygons, new Profile.NullProfile() {
@Override
public List<OsmRelationInfo> preprocessOsmRelation(OsmElement.Relation relation) {
return List.of(new TestRelInfo(1, "name"));
@ -635,7 +637,7 @@ public class OsmReaderTest {
@Test
public void testNodeOrWayRelationInRelationDoesntTriggerWay() {
record TestRelInfo(long id, String name) implements OsmRelationInfo {}
OsmReader reader = new OsmReader("osm", () -> osmSource, nodeMap, new Profile.NullProfile() {
OsmReader reader = new OsmReader("osm", () -> osmSource, nodeMap, multipolygons, new Profile.NullProfile() {
@Override
public List<OsmRelationInfo> preprocessOsmRelation(OsmElement.Relation relation) {
return List.of(new TestRelInfo(1, "name"));
@ -659,6 +661,6 @@ public class OsmReaderTest {
}
private OsmReader newOsmReader() {
return new OsmReader("osm", () -> osmSource, nodeMap, profile, stats);
return new OsmReader("osm", () -> osmSource, nodeMap, multipolygons, profile, stats);
}
}

Wyświetl plik

@ -4,6 +4,7 @@ import com.onthegomap.planetiler.Planetiler;
import com.onthegomap.planetiler.Profile;
import com.onthegomap.planetiler.collection.FeatureGroup;
import com.onthegomap.planetiler.collection.LongLongMap;
import com.onthegomap.planetiler.collection.LongLongMultimap;
import com.onthegomap.planetiler.config.Arguments;
import com.onthegomap.planetiler.config.MbtilesMetadata;
import com.onthegomap.planetiler.config.PlanetilerConfig;
@ -88,7 +89,8 @@ public class ToiletsOverlayLowLevelApi {
* any node locations.
*/
var nodeLocations = LongLongMap.noop();
var osmReader = new OsmReader("osm", new OsmInputFile(input), nodeLocations, profile, stats)
var multipolygons = LongLongMultimap.noop();
var osmReader = new OsmReader("osm", new OsmInputFile(input), nodeLocations, multipolygons, profile, stats)
) {
// Normally you need to run OsmReader.pass1(config) first which stores node locations and preprocesses relations for
// way processing, and counts elements. But since this profile only processes nodes we can skip pass 1.