Custom PBF parsing (#82)

Pull in pbf parsing to give more control over threading model and performance.
pull/102/head
Michael Barry 2022-02-28 20:52:30 -05:00 zatwierdzone przez GitHub
rodzic 7e8596b0e1
commit 2f05f942b2
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 4AEE18F83AFDEB23
34 zmienionych plików z 1842 dodań i 728 usunięć

Wyświetl plik

@ -27,6 +27,8 @@ The `planetiler-core` module includes the following software:
- `VectorTileEncoder`
from [java-vector-tile](https://github.com/ElectronicChartCentre/java-vector-tile) (Apache license)
- `Imposm3Parsers` from [imposm3](https://github.com/omniscale/imposm3) (Apache license)
- `PbfDecoder` from [osmosis](https://github.com/openstreetmap/osmosis) (Public Domain)
- `PbfFieldDecoder` from [osmosis](https://github.com/openstreetmap/osmosis) (Public Domain)
Additionally, the `planetiler-basemap` module is based on [OpenMapTiles](https://github.com/openmaptiles/openmaptiles):

Wyświetl plik

@ -1,17 +1,14 @@
package com.onthegomap.planetiler.benchmarks;
import com.graphhopper.reader.ReaderElementUtils;
import com.graphhopper.reader.ReaderNode;
import com.graphhopper.reader.ReaderRelation;
import com.graphhopper.reader.ReaderWay;
import com.onthegomap.planetiler.basemap.BasemapProfile;
import com.onthegomap.planetiler.config.PlanetilerConfig;
import com.onthegomap.planetiler.expression.MultiExpression;
import com.onthegomap.planetiler.reader.SourceFeature;
import com.onthegomap.planetiler.reader.osm.OsmElement;
import com.onthegomap.planetiler.reader.osm.OsmInputFile;
import com.onthegomap.planetiler.stats.ProgressLoggers;
import com.onthegomap.planetiler.stats.Stats;
import com.onthegomap.planetiler.util.Translations;
import java.io.IOException;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
@ -25,47 +22,53 @@ import org.locationtech.jts.geom.Geometry;
*/
public class BasemapMapping {
public static void main(String[] args) throws IOException {
public static void main(String[] args) throws Exception {
var profile = new BasemapProfile(Translations.nullProvider(List.of()), PlanetilerConfig.defaults(),
Stats.inMemory());
var random = new Random(0);
var input = new OsmInputFile(Path.of("data", "sources", "north-america_us_massachusetts.pbf"));
List<SourceFeature> inputs = new ArrayList<>();
input.readTo(readerElem -> {
if (random.nextDouble() < 0.2) {
if (inputs.size() % 1_000_000 == 0) {
System.err.println(inputs.size());
var logger = ProgressLoggers.create()
.addRateCounter("inputs", inputs::size)
.addProcessStats();
try (var reader = OsmInputFile.readFrom(Path.of("data", "sources", "massachusetts.osm.pbf"))) {
reader.forEachBlock(block -> {
for (var element : block.decodeElements()) {
if (random.nextDouble() < 0.2) {
if (inputs.size() % 1_000_000 == 0) {
logger.log();
}
inputs.add(new SourceFeature(element.tags(), "", "", null, element.id()) {
@Override
public Geometry latLonGeometry() {
return null;
}
@Override
public Geometry worldGeometry() {
return null;
}
@Override
public boolean isPoint() {
return element instanceof OsmElement.Node;
}
@Override
public boolean canBePolygon() {
return element instanceof OsmElement.Way || element instanceof OsmElement.Relation;
}
@Override
public boolean canBeLine() {
return element instanceof OsmElement.Way;
}
});
}
}
var props = ReaderElementUtils.getTags(readerElem);
inputs.add(new SourceFeature(props, "", "", null, readerElem.getId()) {
@Override
public Geometry latLonGeometry() {
return null;
}
@Override
public Geometry worldGeometry() {
return null;
}
@Override
public boolean isPoint() {
return readerElem instanceof ReaderNode;
}
@Override
public boolean canBePolygon() {
return readerElem instanceof ReaderWay || readerElem instanceof ReaderRelation;
}
@Override
public boolean canBeLine() {
return readerElem instanceof ReaderWay;
}
});
}
}, "reader", 3);
});
}
logger.log();
System.err.println("read " + inputs.size() + " elems");
long startStart = System.nanoTime();
@ -79,8 +82,10 @@ public class BasemapMapping {
}
if (count == 0) {
startStart = System.nanoTime();
logger.log();
System.err.println("finished warmup");
} else {
logger.log();
System.err.println(
"took:" + Duration.ofNanos(System.nanoTime() - start).toMillis() + "ms found:" + i + " avg:" + (Duration
.ofNanos(System.nanoTime() - startStart).toMillis() / count) + "ms");

Wyświetl plik

@ -0,0 +1,33 @@
package com.onthegomap.planetiler.benchmarks;
import com.onthegomap.planetiler.Profile;
import com.onthegomap.planetiler.collection.LongLongMap;
import com.onthegomap.planetiler.config.Arguments;
import com.onthegomap.planetiler.config.PlanetilerConfig;
import com.onthegomap.planetiler.reader.osm.OsmInputFile;
import com.onthegomap.planetiler.reader.osm.OsmReader;
import com.onthegomap.planetiler.stats.Stats;
import com.onthegomap.planetiler.stats.Timer;
import java.io.IOException;
import java.nio.file.Path;
public class BenchmarkOsmRead {
public static void main(String[] args) throws IOException {
OsmInputFile file = new OsmInputFile(Path.of("data/sources/northeast.osm.pbf"), true);
var profile = new Profile.NullProfile();
var stats = Stats.inMemory();
var config = PlanetilerConfig.from(Arguments.of());
while (true) {
Timer timer = Timer.start();
try (
var nodes = LongLongMap.noop();
var reader = new OsmReader("osm", file, nodes, profile, stats)
) {
reader.pass1(config);
}
System.err.println(timer.stop());
}
}
}

Wyświetl plik

@ -35,7 +35,7 @@ public class LongLongMapBench {
}
LocalCounter counter = new LocalCounter();
ProgressLoggers loggers = ProgressLoggers.create()
.addRatePercentCounter("entries", entries, () -> counter.count)
.addRatePercentCounter("entries", entries, () -> counter.count, true)
.newLine()
.addProcessStats();
AtomicReference<String> writeRate = new AtomicReference<>();

Wyświetl plik

@ -1,13 +0,0 @@
package com.graphhopper.reader;
import java.util.Map;
/**
* Utility to gain access to protected method {@link ReaderElement#getTags()}
*/
public class ReaderElementUtils {
public static Map<String, Object> getTags(ReaderElement elem) {
return elem.getTags();
}
}

Wyświetl plik

@ -150,7 +150,7 @@ public class Planetiler {
throw new IllegalArgumentException("Currently only one OSM input file is supported");
}
Path path = getPath(name, "OSM input file", defaultPath, defaultUrl);
var thisInputFile = new OsmInputFile(path);
var thisInputFile = new OsmInputFile(path, config.osmLazyReads());
osmInputFile = thisInputFile;
return appendStage(new Stage(
name,

Wyświetl plik

@ -14,18 +14,27 @@ public interface IterableOnce<T> extends Iterable<T>, Supplier<T> {
@Override
default Iterator<T> iterator() {
return new Iterator<>() {
T next = get();
T next = null;
boolean stale = true;
private void advance() {
if (stale) {
next = get();
stale = false;
}
}
@Override
public boolean hasNext() {
advance();
return next != null;
}
@Override
public T next() {
T result = next;
next = get();
return result;
advance();
stale = true;
return next;
}
};
}

Wyświetl plik

@ -157,6 +157,7 @@ public interface LongLongMap extends Closeable, MemoryEstimator.HasEstimate, Dis
private final AppendStore.Longs keys;
private final AppendStore.Longs values;
private long lastChunk = -1;
private long lastKey = -1;
public SortedTable(AppendStore.Longs keys, AppendStore.Longs values) {
this.keys = keys;
@ -165,6 +166,10 @@ public interface LongLongMap extends Closeable, MemoryEstimator.HasEstimate, Dis
@Override
public void put(long key, long value) {
if (key <= lastKey) {
throw new IllegalArgumentException("Nodes must be sorted ascending by ID, " + key + " came after " + lastKey);
}
lastKey = key;
long idx = keys.size();
long chunk = key >>> 8;
if (chunk != lastChunk) {
@ -236,6 +241,7 @@ public interface LongLongMap extends Closeable, MemoryEstimator.HasEstimate, Dis
private final AppendStore.Longs values;
private int lastChunk = -1;
private int lastOffset = 0;
private long lastKey = -1;
public SparseArray(AppendStore.Longs values) {
this.values = values;
@ -243,6 +249,10 @@ public interface LongLongMap extends Closeable, MemoryEstimator.HasEstimate, Dis
@Override
public void put(long key, long value) {
if (key <= lastKey) {
throw new IllegalArgumentException("Nodes must be sorted ascending by ID, " + key + " came after " + lastKey);
}
lastKey = key;
long idx = values.size();
int chunk = (int) (key >>> 8);
int offset = (int) (key & 255);

Wyświetl plik

@ -28,7 +28,8 @@ public record PlanetilerConfig(
double minFeatureSizeAtMaxZoom,
double minFeatureSizeBelowMaxZoom,
double simplifyToleranceAtMaxZoom,
double simplifyToleranceBelowMaxZoom
double simplifyToleranceBelowMaxZoom,
boolean osmLazyReads
) {
public static final int MIN_MINZOOM = 0;
@ -86,7 +87,10 @@ public record PlanetilerConfig(
256d / 4096),
arguments.getDouble("simplify_tolerance",
"Default value for the tile pixel tolerance to use when simplifying features below the maximum zoom level",
0.1d)
0.1d),
arguments.getBoolean("osm_lazy_reads",
"Read OSM blocks from disk in worker threads",
false)
);
}

Wyświetl plik

@ -121,7 +121,7 @@ public class MbtilesWriter {
*/
WorkQueue<TileBatch> writerQueue = new WorkQueue<>("mbtiles_writer_queue", queueSize, 1, stats);
encodeBranch = pipeline
.<TileBatch>fromGenerator("reader", next -> {
.<TileBatch>fromGenerator("read", next -> {
writer.readFeaturesAndBatch(batch -> {
next.accept(batch);
writerQueue.accept(batch); // also send immediately to writer
@ -130,12 +130,12 @@ public class MbtilesWriter {
// use only 1 thread since readFeaturesAndBatch needs to be single-threaded
}, 1)
.addBuffer("reader_queue", queueSize)
.sinkTo("encoder", config.threads(), writer::tileEncoderSink);
.sinkTo("encode", config.threads(), writer::tileEncoderSink);
// the tile writer will wait on the result of each batch to ensure tiles are written in order
writeBranch = pipeline.readFromQueue(writerQueue)
// use only 1 thread since tileWriter needs to be single-threaded
.sinkTo("writer", 1, writer::tileWriter);
.sinkTo("write", 1, writer::tileWriter);
} else {
/*
* If we don't need to emit tiles in order, just send the features to the encoder, and when it finishes with
@ -143,16 +143,16 @@ public class MbtilesWriter {
*/
encodeBranch = pipeline
// use only 1 thread since readFeaturesAndBatch needs to be single-threaded
.fromGenerator("reader", writer::readFeaturesAndBatch, 1)
.fromGenerator("read", writer::readFeaturesAndBatch, 1)
.addBuffer("reader_queue", queueSize)
.addWorker("encoder", config.threads(), writer::tileEncoder)
.addBuffer("writer_queue", queueSize)
// use only 1 thread since tileWriter needs to be single-threaded
.sinkTo("writer", 1, writer::tileWriter);
.sinkTo("write", 1, writer::tileWriter);
}
var loggers = ProgressLoggers.create()
.addRatePercentCounter("features", features.numFeaturesWritten(), writer.featuresProcessed)
.addRatePercentCounter("features", features.numFeaturesWritten(), writer.featuresProcessed, true)
.addFileSize(features)
.addRateCounter("tiles", writer::tilesEmitted)
.addFileSize(fileSize)

Wyświetl plik

@ -83,7 +83,7 @@ public abstract class SimpleReader implements Closeable {
});
var loggers = ProgressLoggers.create()
.addRatePercentCounter("read", featureCount, featuresRead)
.addRatePercentCounter("read", featureCount, featuresRead, true)
.addRateCounter("write", featuresWritten)
.addFileSize(writer)
.newLine()

Wyświetl plik

@ -36,6 +36,17 @@ public interface WithTags {
return value1.equals(actual) || value2.equals(actual);
}
/** Returns true if the value for {@code key} is {@code value1} or {@code value2}. */
default boolean hasTag(String key, Object... values) {
Object actual = getTag(key);
for (Object value : values) {
if (value.equals(actual)) {
return true;
}
}
return false;
}
/** Returns the {@link Object#toString()} value for {@code key} or {@code null} if not present. */
default String getString(String key) {
Object value = getTag(key);

Wyświetl plik

@ -0,0 +1,37 @@
package com.onthegomap.planetiler.reader.osm;
import java.io.Closeable;
import java.util.function.Consumer;
/**
* An osm.pbf input file that iterates through {@link Block Blocks} of raw bytes that can be decompressed/parsed in
* worker threads using {@link Block#decodeElements()}.
*/
public interface OsmBlockSource extends Closeable {
/** Calls {@code consumer} for each block from the input file sequentially in a single thread. */
void forEachBlock(Consumer<Block> consumer);
@Override
default void close() {
}
/**
* An individual block of raw bytes from an osm.pbf file that can be decompressed/parsed with {@link
* #decodeElements()}.
*/
interface Block {
/** Create a fake block from existing elements - useful for tests. */
static Block of(Iterable<? extends OsmElement> items) {
return () -> items;
}
/** Decompress and parse OSM elements from this block. */
Iterable<? extends OsmElement> decodeElements();
default int id() {
return -1;
}
}
}

Wyświetl plik

@ -1,23 +1,17 @@
package com.onthegomap.planetiler.reader.osm;
import com.carrotsearch.hppc.LongArrayList;
import com.graphhopper.reader.ReaderElement;
import com.graphhopper.reader.ReaderElementUtils;
import com.graphhopper.reader.ReaderNode;
import com.graphhopper.reader.ReaderRelation;
import com.graphhopper.reader.ReaderWay;
import com.onthegomap.planetiler.geo.GeoUtils;
import com.onthegomap.planetiler.reader.WithTags;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
/**
* An input element read from OpenStreetMap data.
* <p>
* Graphhopper utilities are used internally for processing OpenStreetMap data, but to avoid leaking graphhopper
* dependencies out through the exposed API, convert {@link ReaderElement} instances to OsmElements first.
*
* @see <a href="https://wiki.openstreetmap.org/wiki/Elements">OSM element data model</a>
*/
@ -26,6 +20,8 @@ public interface OsmElement extends WithTags {
/** OSM element ID */
long id();
int cost();
enum Type {
NODE, WAY, RELATION
}
@ -34,19 +30,100 @@ public interface OsmElement extends WithTags {
record Other(
@Override long id,
@Override Map<String, Object> tags
) implements OsmElement {}
) implements OsmElement {
@Override
public int cost() {
return 1 + tags.size();
}
}
/** A point on the earth's surface. */
record Node(
@Override long id,
@Override Map<String, Object> tags,
double lat,
double lon
) implements OsmElement {
final class Node implements OsmElement {
private static final long MISSING_LOCATION = Long.MIN_VALUE;
private final long id;
private final Map<String, Object> tags;
private final double lat;
private final double lon;
// bailed out of a record to make encodedLocation lazy since it is fairly expensive to compute
private long encodedLocation = MISSING_LOCATION;
public Node(
long id,
Map<String, Object> tags,
double lat,
double lon
) {
this.id = id;
this.tags = tags;
this.lat = lat;
this.lon = lon;
}
public Node(long id, double lat, double lon) {
this(id, new HashMap<>(), lat, lon);
}
@Override
public long id() {
return id;
}
@Override
public Map<String, Object> tags() {
return tags;
}
public double lat() {
return lat;
}
public double lon() {
return lon;
}
public long encodedLocation() {
if (encodedLocation == MISSING_LOCATION) {
encodedLocation = GeoUtils.encodeFlatLocation(lon, lat);
}
return encodedLocation;
}
@Override
public int cost() {
return 1 + tags.size();
}
@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
if (obj == null || obj.getClass() != this.getClass()) {
return false;
}
var that = (Node) obj;
return this.id == that.id &&
Objects.equals(this.tags, that.tags) &&
Double.doubleToLongBits(this.lat) == Double.doubleToLongBits(that.lat) &&
Double.doubleToLongBits(this.lon) == Double.doubleToLongBits(that.lon);
}
@Override
public int hashCode() {
return Objects.hash(id, tags, lat, lon);
}
@Override
public String toString() {
return "Node[" +
"id=" + id + ", " +
"tags=" + tags + ", " +
"lat=" + lat + ", " +
"lon=" + lon + ']';
}
}
/** An ordered list of 2-2,000 nodes that define a polyline. */
@ -59,6 +136,11 @@ public interface OsmElement extends WithTags {
public Way(long id) {
this(id, new HashMap<>(), new LongArrayList(5));
}
@Override
public int cost() {
return 1 + tags.size() + nodes.size();
}
}
/** An ordered list of nodes, ways, and other relations. */
@ -78,6 +160,11 @@ public interface OsmElement extends WithTags {
}
}
@Override
public int cost() {
return 1 + tags.size() + members.size() * 3;
}
/** A node, way, or relation contained in a relation with an optional "role" to clarify the purpose of each member. */
public record Member(
Type type,
@ -85,51 +172,4 @@ public interface OsmElement extends WithTags {
String role
) {}
}
/*
* Utilities to convert from graphhopper ReaderElements to this class to avoid leaking graphhopper APIs through
* the exposed public API.
*/
static OsmElement fromGraphhopper(ReaderElement element) {
if (element instanceof ReaderNode node) {
return fromGraphopper(node);
} else if (element instanceof ReaderWay way) {
return fromGraphopper(way);
} else if (element instanceof ReaderRelation relation) {
return fromGraphopper(relation);
} else {
long id = element.getId();
Map<String, Object> tags = ReaderElementUtils.getTags(element);
return new Other(id, tags);
}
}
static Node fromGraphopper(ReaderNode node) {
long id = node.getId();
Map<String, Object> tags = ReaderElementUtils.getTags(node);
return new Node(id, tags, node.getLat(), node.getLon());
}
static Way fromGraphopper(ReaderWay way) {
long id = way.getId();
Map<String, Object> tags = ReaderElementUtils.getTags(way);
return new Way(id, tags, way.getNodes());
}
static Relation fromGraphopper(ReaderRelation relation) {
long id = relation.getId();
Map<String, Object> tags = ReaderElementUtils.getTags(relation);
List<ReaderRelation.Member> readerMembers = relation.getMembers();
List<Relation.Member> members = new ArrayList<>(readerMembers.size());
for (var member : readerMembers) {
Type type = switch (member.getType()) {
case ReaderRelation.Member.NODE -> Type.NODE;
case ReaderRelation.Member.WAY -> Type.WAY;
case ReaderRelation.Member.RELATION -> Type.RELATION;
default -> throw new IllegalArgumentException("Unrecognized type: " + member.getType());
};
members.add(new Relation.Member(type, member.getRef(), member.getRole()));
}
return new Relation(id, tags, members);
}
}

Wyświetl plik

@ -0,0 +1,17 @@
package com.onthegomap.planetiler.reader.osm;
import java.time.Instant;
import java.util.List;
import org.locationtech.jts.geom.Envelope;
/** Data parsed from the header block of an OSM input file. */
public record OsmHeader(
Envelope bounds,
List<String> requiredFeatures,
List<String> optionalFeaturesList,
String writingprogram,
String source,
Instant instant,
long osmosisReplicationSequenceNumber,
String osmosisReplicationBaseUrl
) {}

Wyświetl plik

@ -1,40 +1,96 @@
package com.onthegomap.planetiler.reader.osm;
import com.google.protobuf.ByteString;
import com.graphhopper.reader.ReaderElement;
import com.graphhopper.reader.osm.pbf.PbfDecoder;
import com.graphhopper.reader.osm.pbf.PbfStreamSplitter;
import com.graphhopper.reader.osm.pbf.Sink;
import com.onthegomap.planetiler.config.Bounds;
import com.onthegomap.planetiler.worker.WorkerPipeline;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.nio.file.StandardOpenOption;
import java.util.List;
import java.util.function.Consumer;
import java.util.zip.DataFormatException;
import java.util.zip.Inflater;
import java.util.function.Supplier;
import org.locationtech.jts.geom.Envelope;
import org.openstreetmap.osmosis.osmbinary.Fileformat.Blob;
import org.openstreetmap.osmosis.osmbinary.Fileformat.BlobHeader;
import org.openstreetmap.osmosis.osmbinary.Osmformat.HeaderBBox;
import org.openstreetmap.osmosis.osmbinary.Osmformat.HeaderBlock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* An input file in {@code .osm.pbf} format.
*
* @see <a href="https://wiki.openstreetmap.org/wiki/PBF_Format">OSM PBF Format</a>
*/
public class OsmInputFile implements Bounds.Provider, OsmSource {
public class OsmInputFile implements Bounds.Provider, Supplier<OsmBlockSource> {
private static final Logger LOGGER = LoggerFactory.getLogger(OsmInputFile.class);
private final Path path;
private final boolean lazy;
/**
* Creates a new OSM input file reader.
*
* @param path Path to the file
* @param lazyReads If {@code true}, defers reading the actual content of each block from disk until the block is
* decoded in a worker thread.
*/
public OsmInputFile(Path path, boolean lazyReads) {
this.path = path;
lazy = lazyReads;
}
public OsmInputFile(Path path) {
this.path = path;
this(path, false);
}
private static int readInt(FileChannel channel) throws IOException {
ByteBuffer buf = ByteBuffer.allocate(4);
int read = channel.read(buf);
if (read != 4) {
throw new IOException("Tried to read 4 bytes but only got " + read);
}
return buf.flip().getInt();
}
private static byte[] readBytes(FileChannel channel, int length) throws IOException {
ByteBuffer buf = ByteBuffer.allocate(length);
int read = channel.read(buf);
if (read != length) {
throw new IOException("Tried to read " + length + " bytes but only got " + read);
}
return buf.flip().array();
}
private static byte[] readBytes(FileChannel channel, long offset, int length) throws IOException {
ByteBuffer buf = ByteBuffer.allocate(length);
int read = channel.read(buf, offset);
if (read != length) {
throw new IOException("Tried to read " + length + " bytes at " + offset + " but only got " + read);
}
return buf.flip().array();
}
public static OsmBlockSource readFrom(Path path) {
return new OsmInputFile(path).get();
}
private static void validateHeader(byte[] data) {
OsmHeader header = PbfDecoder.decodeHeader(data);
List<String> unsupportedFeatures = header.requiredFeatures().stream()
.filter(feature -> !(feature.equals("OsmSchema-V0.6") || feature.equals("DenseNodes")))
.toList();
if (!unsupportedFeatures.isEmpty()) {
throw new RuntimeException("PBF file contains unsupported features " + unsupportedFeatures);
}
}
private static BlobHeader readBlobHeader(FileChannel channel) throws IOException {
int headerSize = readInt(channel);
if (headerSize > 64 * 1024) {
throw new IllegalArgumentException("Header longer than 64 KiB");
}
byte[] headerBytes = readBytes(channel, headerSize);
return BlobHeader.parseFrom(headerBytes);
}
/**
@ -44,86 +100,125 @@ public class OsmInputFile implements Bounds.Provider, OsmSource {
*/
@Override
public Envelope getLatLonBounds() {
try (var input = Files.newInputStream(path)) {
// Read the "bbox" field of the header block of the input file.
// https://wiki.openstreetmap.org/wiki/PBF_Format
var dataInput = new DataInputStream(input);
int headerSize = dataInput.readInt();
if (headerSize > 64 * 1024) {
throw new IllegalArgumentException("Header longer than 64 KiB: " + path);
}
byte[] buf = dataInput.readNBytes(headerSize);
BlobHeader header = BlobHeader.parseFrom(buf);
if (!header.getType().equals("OSMHeader")) {
throw new IllegalArgumentException("Expecting OSMHeader got " + header.getType() + " in " + path);
}
buf = dataInput.readNBytes(header.getDatasize());
Blob blob = Blob.parseFrom(buf);
ByteString data;
if (blob.hasRaw()) {
data = blob.getRaw();
} else if (blob.hasZlibData()) {
byte[] buf2 = new byte[blob.getRawSize()];
Inflater decompresser = new Inflater();
decompresser.setInput(blob.getZlibData().toByteArray());
decompresser.inflate(buf2);
decompresser.end();
data = ByteString.copyFrom(buf2);
} else {
throw new IllegalArgumentException("Header does not have raw or zlib data");
}
HeaderBlock headerblock = HeaderBlock.parseFrom(data);
HeaderBBox bbox = headerblock.getBbox();
// always specified in nanodegrees
return new Envelope(
bbox.getLeft() / 1e9,
bbox.getRight() / 1e9,
bbox.getBottom() / 1e9,
bbox.getTop() / 1e9
);
} catch (IOException | DataFormatException e) {
return getHeader().bounds();
}
/**
* Returns details from the header block for this osm.pbf file.
*
* @throws IllegalArgumentException if an error is encountered reading the file
*/
public OsmHeader getHeader() {
try (var channel = openChannel()) {
BlobHeader header = readBlobHeader(channel);
byte[] blobBytes = readBytes(channel, header.getDatasize());
return PbfDecoder.decodeHeader(blobBytes);
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
/**
* Reads all elements from the input file using {@code threads} threads to decode blocks in parallel and writes them
* to {@code next}.
*
* @throws IOException if an error is encountered reading the file
*/
public void readTo(Consumer<ReaderElement> next, String poolName, int threads) throws IOException {
ThreadFactory threadFactory = Executors.defaultThreadFactory();
ExecutorService executorService = Executors.newFixedThreadPool(threads, (runnable) -> {
Thread thread = threadFactory.newThread(runnable);
thread.setName(poolName + "-" + thread.getName());
return thread;
});
try (var stream = new BufferedInputStream(Files.newInputStream(path), 50_000)) {
PbfStreamSplitter streamSplitter = new PbfStreamSplitter(new DataInputStream(stream));
var sink = new ReaderElementSink(next);
PbfDecoder pbfDecoder = new PbfDecoder(streamSplitter, executorService, threads + 1, sink);
pbfDecoder.run();
} finally {
executorService.shutdownNow();
}
}
/** Starts a {@link WorkerPipeline} with all elements read from this input file. */
@Override
public WorkerPipeline.SourceStep<ReaderElement> read(String poolName, int threads) {
return next -> readTo(next, poolName, threads);
public OsmBlockSource get() {
return lazy ? new LazyReader() : new EagerReader();
}
private record ReaderElementSink(Consumer<ReaderElement> queue) implements Sink {
private FileChannel openChannel() {
try {
return FileChannel.open(path, StandardOpenOption.READ);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
/**
* An OSM block reader that iterates through the input file in a single thread, reading the raw bytes of each block
* and passing them off to worker threads.
*/
private class EagerReader implements OsmBlockSource {
@Override
public void process(ReaderElement readerElement) {
queue.accept(readerElement);
public void forEachBlock(Consumer<Block> consumer) {
int blockId = 0;
try (FileChannel channel = openChannel()) {
final long size = channel.size();
while (channel.position() < size) {
BlobHeader header = readBlobHeader(channel);
byte[] blockBytes = readBytes(channel, header.getDatasize());
String headerType = header.getType();
if ("OSMData".equals(headerType)) {
consumer.accept(new EagerBlock(blockId++, blockBytes));
} else if ("OSMHeader".equals(headerType)) {
validateHeader(blockBytes);
} else {
LOGGER.warn("Unrecognized OSM PBF blob header type: " + headerType);
}
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
private record EagerBlock(@Override int id, byte[] bytes) implements Block {
public Iterable<OsmElement> decodeElements() {
return PbfDecoder.decode(bytes);
}
}
}
/**
* An OSM block reader that iterates through the input file in a single thread, skipping over each block and just
* passing the position/offset to workers so they can read the contents from disk in parallel.
* <p>
* This may result in a speedup on some systems.
*/
private class LazyReader implements OsmBlockSource {
FileChannel lazyReadChannel = openChannel();
@Override
public void forEachBlock(Consumer<Block> consumer) {
int blockId = 0;
try (FileChannel channel = openChannel()) {
final long size = channel.size();
while (channel.position() < size) {
BlobHeader header = readBlobHeader(channel);
int blockSize = header.getDatasize();
String headerType = header.getType();
long blockStartPosition = channel.position();
if ("OSMData".equals(headerType)) {
consumer.accept(new LazyBlock(blockId++, blockStartPosition, blockSize, lazyReadChannel));
} else if ("OSMHeader".equals(headerType)) {
validateHeader(readBytes(channel, blockStartPosition, blockSize));
} else {
LOGGER.warn("Unrecognized OSM PBF blob header type: " + headerType);
}
channel.position(blockStartPosition + blockSize);
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
@Override
public void complete() {
public void close() {
try {
lazyReadChannel.close();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
private record LazyBlock(@Override int id, long offset, int length, FileChannel channel) implements Block {
public Iterable<OsmElement> decodeElements() {
try {
return PbfDecoder.decode(readBytes(channel, offset, length));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}
}
}

Wyświetl plik

@ -3,20 +3,13 @@ package com.onthegomap.planetiler.reader.osm;
import static com.onthegomap.planetiler.util.MemoryEstimator.estimateSize;
import static com.onthegomap.planetiler.util.MemoryEstimator.estimateSizeWithoutKeys;
import static com.onthegomap.planetiler.util.MemoryEstimator.estimateSizeWithoutValues;
import static com.onthegomap.planetiler.worker.Worker.joinFutures;
import com.carrotsearch.hppc.IntObjectHashMap;
import com.carrotsearch.hppc.LongArrayList;
import com.carrotsearch.hppc.LongHashSet;
import com.carrotsearch.hppc.LongObjectHashMap;
import com.carrotsearch.hppc.ObjectIntHashMap;
import com.graphhopper.coll.GHIntObjectHashMap;
import com.graphhopper.coll.GHLongHashSet;
import com.graphhopper.coll.GHLongObjectHashMap;
import com.graphhopper.coll.GHObjectIntHashMap;
import com.graphhopper.reader.ReaderElement;
import com.graphhopper.reader.ReaderElementUtils;
import com.graphhopper.reader.ReaderNode;
import com.graphhopper.reader.ReaderRelation;
import com.graphhopper.reader.ReaderWay;
import com.onthegomap.planetiler.FeatureCollector;
import com.onthegomap.planetiler.Profile;
import com.onthegomap.planetiler.collection.FeatureGroup;
@ -29,10 +22,14 @@ import com.onthegomap.planetiler.geo.GeometryException;
import com.onthegomap.planetiler.reader.SourceFeature;
import com.onthegomap.planetiler.render.FeatureRenderer;
import com.onthegomap.planetiler.stats.Counter;
import com.onthegomap.planetiler.stats.ProcessInfo;
import com.onthegomap.planetiler.stats.ProgressLoggers;
import com.onthegomap.planetiler.stats.Stats;
import com.onthegomap.planetiler.util.Format;
import com.onthegomap.planetiler.util.MemoryEstimator;
import com.onthegomap.planetiler.worker.Distributor;
import com.onthegomap.planetiler.worker.WeightedHandoffQueue;
import com.onthegomap.planetiler.worker.WorkQueue;
import com.onthegomap.planetiler.worker.WorkerPipeline;
import java.io.Closeable;
import java.io.IOException;
@ -42,6 +39,7 @@ import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.locationtech.jts.geom.Coordinate;
import org.locationtech.jts.geom.CoordinateList;
import org.locationtech.jts.geom.CoordinateSequence;
@ -69,9 +67,10 @@ public class OsmReader implements Closeable, MemoryEstimator.HasEstimate {
private static final int ROLE_SHIFT = 64 - ROLE_BITS;
private static final int ROLE_MASK = (1 << ROLE_BITS) - 1;
private static final long NOT_ROLE_MASK = (1L << ROLE_SHIFT) - 1L;
private final OsmSource osmInputFile;
private final OsmBlockSource osmBlockSource;
private final Stats stats;
private final LongLongMap nodeLocationDb;
private final Counter.Readable PASS1_BLOCKS = Counter.newSingleThreadCounter();
private final Counter.Readable PASS1_NODES = Counter.newSingleThreadCounter();
private final Counter.Readable PASS1_WAYS = Counter.newSingleThreadCounter();
private final Counter.Readable PASS1_RELATIONS = Counter.newSingleThreadCounter();
@ -82,38 +81,39 @@ public class OsmReader implements Closeable, MemoryEstimator.HasEstimate {
// for routes (750k rels 40m ways) and boundaries (650k rels, 8m ways)
// need to store route info to use later when processing ways
// <~500mb
private GHLongObjectHashMap<OsmRelationInfo> relationInfo = new GHLongObjectHashMap<>();
private LongObjectHashMap<OsmRelationInfo> relationInfo = new LongObjectHashMap<>();
// ~800mb, ~1.6GB when sorting
private LongLongMultimap wayToRelations = LongLongMultimap.newSparseUnorderedMultimap();
// for multipolygons need to store way info (20m ways, 800m nodes) to use when processing relations (4.5m)
// ~300mb
private LongHashSet waysInMultipolygon = new GHLongHashSet();
private LongHashSet waysInMultipolygon = new LongHashSet();
// ~7GB
private LongLongMultimap multipolygonWayGeometries = LongLongMultimap.newDensedOrderedMultimap();
// keep track of data needed to encode/decode role strings into a long
private final ObjectIntHashMap<String> roleIds = new GHObjectIntHashMap<>();
private final IntObjectHashMap<String> roleIdsReverse = new GHIntObjectHashMap<>();
private final ObjectIntHashMap<String> roleIds = new ObjectIntHashMap<>();
private final IntObjectHashMap<String> roleIdsReverse = new IntObjectHashMap<>();
private final AtomicLong roleSizes = new AtomicLong(0);
/**
* Constructs a new {@code OsmReader} from
*
* @param name ID for this reader to use in stats and logs
* @param osmInputFile the file to read raw nodes, ways, and relations from
* @param nodeLocationDb store that will temporarily hold node locations (encoded as a long) between passes to
* reconstruct way geometries
* @param profile logic that defines what map features to emit for each source feature
* @param stats to keep track of counters and timings
* @param name ID for this reader to use in stats and logs
* @param osmSourceProvider the file to read raw nodes, ways, and relations from
* @param nodeLocationDb store that will temporarily hold node locations (encoded as a long) between passes to
* reconstruct way geometries
* @param profile logic that defines what map features to emit for each source feature
* @param stats to keep track of counters and timings
*/
public OsmReader(String name, OsmSource osmInputFile, LongLongMap nodeLocationDb, Profile profile,
public OsmReader(String name, Supplier<OsmBlockSource> osmSourceProvider, LongLongMap nodeLocationDb, Profile profile,
Stats stats) {
this.name = name;
this.osmInputFile = osmInputFile;
this.osmBlockSource = osmSourceProvider.get();
this.nodeLocationDb = nodeLocationDb;
this.stats = stats;
this.profile = profile;
stats.monitorInMemoryObject("osm_relations", this);
stats.counter("osm_pass1_elements_processed", "type", () -> Map.of(
"blocks", PASS1_BLOCKS,
"nodes", PASS1_NODES,
"ways", PASS1_WAYS,
"relations", PASS1_RELATIONS
@ -131,89 +131,144 @@ public class OsmReader implements Closeable, MemoryEstimator.HasEstimate {
* @param config user-provided arguments to control the number of threads, and log interval
*/
public void pass1(PlanetilerConfig config) {
record BlockWithResult(OsmBlockSource.Block block, WeightedHandoffQueue<OsmElement> result) {}
var timer = stats.startStage("osm_pass1");
String pbfParsePrefix = "pbfpass1";
int parseThreads = Math.max(1, config.threads() - 2);
var pipeline = WorkerPipeline.start("osm_pass1", stats)
.fromGenerator("pbf", osmInputFile.read("pbfpass1", parseThreads))
.addBuffer("reader_queue", 50_000, 10_000)
.sinkToConsumer("process", 1, this::processPass1Element);
int pendingBlocks = parseThreads * 2;
// Each worker will hand off finished elements to the single process thread. A Future<List<OsmElement>> would result
// in too much memory usage/GC so use a WeightedHandoffQueue instead which will fill up with lightweight objects
// like nodes without any tags, but limit the number of pending heavy entities like relations
int handoffQueueBatches = Math.max(
10,
(int) (100d * ProcessInfo.getMaxMemoryBytes() / 100_000_000_000d)
);
var parsedBatches = new WorkQueue<WeightedHandoffQueue<OsmElement>>("elements", pendingBlocks, 1, stats);
var pipeline = WorkerPipeline.start("osm_pass1", stats);
var readBranch = pipeline
.<BlockWithResult>fromGenerator("read", next -> {
osmBlockSource.forEachBlock((block) -> {
WeightedHandoffQueue<OsmElement> result = new WeightedHandoffQueue<>(handoffQueueBatches, 10_000);
parsedBatches.accept(result);
next.accept(new BlockWithResult(block, result));
});
parsedBatches.close();
})
.addBuffer("pbf_blocks", pendingBlocks)
.sinkToConsumer("parse", parseThreads, block -> {
List<OsmElement> result = new ArrayList<>();
boolean nodesDone = false, waysDone = false;
for (var element : block.block.decodeElements()) {
if (element instanceof OsmElement.Node node) {
// pre-compute encoded location in worker threads since it is fairly expensive and should be done in parallel
node.encodedLocation();
if (nodesDone) {
throw new IllegalArgumentException(
"Input file must be sorted with nodes first, then ways, then relations. Encountered node " + node.id()
+ " after a way or relation");
}
} else if (element instanceof OsmElement.Way way) {
nodesDone = true;
if (waysDone) {
throw new IllegalArgumentException(
"Input file must be sorted with nodes first, then ways, then relations. Encountered way " + way.id()
+ " after a relation");
}
} else if (element instanceof OsmElement.Relation) {
nodesDone = waysDone = true;
}
block.result.accept(element, element.cost());
}
block.result.close();
});
var processBranch = pipeline
.readFromQueue(parsedBatches)
.sinkToConsumer("process", 1, this::processPass1Block);
var loggers = ProgressLoggers.create()
.addRateCounter("nodes", PASS1_NODES, true)
.addFileSizeAndRam(nodeLocationDb)
.addRateCounter("ways", PASS1_WAYS, true)
.addRateCounter("rels", PASS1_RELATIONS, true)
.addRateCounter("blocks", PASS1_BLOCKS)
.newLine()
.addProcessStats()
.addInMemoryObject("hppc", this)
.newLine()
.addThreadPoolStats("parse", pbfParsePrefix + "-pool")
.addPipelineStats(pipeline);
pipeline.awaitAndLog(loggers, config.logInterval());
LOGGER.debug(
"nodes: " + FORMAT.integer(PASS1_NODES.get()) +
" ways: " + FORMAT.integer(PASS1_WAYS.get()) +
" relations: " + FORMAT.integer(PASS1_RELATIONS.get()));
.addPipelineStats(readBranch)
.addPipelineStats(processBranch);
loggers.awaitAndLog(joinFutures(readBranch.done(), processBranch.done()), config.logInterval());
LOGGER.debug("processed " +
"blocks:" + FORMAT.integer(PASS1_BLOCKS.get()) +
" nodes:" + FORMAT.integer(PASS1_NODES.get()) +
" ways:" + FORMAT.integer(PASS1_WAYS.get()) +
" relations:" + FORMAT.integer(PASS1_RELATIONS.get()));
timer.stop();
}
void processPass1Element(ReaderElement readerElement) {
// only a single thread calls this with elements ordered by ID, so it's safe to manipulate these
// shared data structures which are not thread safe
if (readerElement.getId() < 0) {
throw new IllegalArgumentException("Negative OSM element IDs not supported: " + readerElement);
}
if (readerElement instanceof ReaderNode node) {
PASS1_NODES.inc();
try {
profile.preprocessOsmNode(OsmElement.fromGraphopper(node));
} catch (Exception e) {
LOGGER.error("Error preprocessing OSM node " + node.getId(), e);
void processPass1Block(Iterable<? extends OsmElement> elements) {
int nodes = 0, ways = 0, relations = 0;
for (OsmElement element : elements) {
// only a single thread calls this with elements ordered by ID, so it's safe to manipulate these
// shared data structures which are not thread safe
if (element.id() < 0) {
throw new IllegalArgumentException("Negative OSM element IDs not supported: " + element);
}
// TODO allow limiting node storage to only ones that profile cares about
nodeLocationDb.put(node.getId(), GeoUtils.encodeFlatLocation(node.getLon(), node.getLat()));
} else if (readerElement instanceof ReaderWay way) {
PASS1_WAYS.inc();
try {
profile.preprocessOsmWay(OsmElement.fromGraphopper(way));
} catch (Exception e) {
LOGGER.error("Error preprocessing OSM way " + way.getId(), e);
}
} else if (readerElement instanceof ReaderRelation rel) {
PASS1_RELATIONS.inc();
// don't leak graphhopper classes out through public API
OsmElement.Relation osmRelation = OsmElement.fromGraphopper(rel);
try {
List<OsmRelationInfo> infos = profile.preprocessOsmRelation(osmRelation);
if (infos != null) {
for (OsmRelationInfo info : infos) {
relationInfo.put(rel.getId(), info);
relationInfoSizes.addAndGet(info.estimateMemoryUsageBytes());
for (ReaderRelation.Member member : rel.getMembers()) {
int type = member.getType();
// TODO handle nodes in relations and super-relations
if (type == ReaderRelation.Member.WAY) {
wayToRelations.put(member.getRef(), encodeRelationMembership(member.getRole(), rel.getId()));
if (element instanceof OsmElement.Node node) {
nodes++;
try {
profile.preprocessOsmNode(node);
} catch (Exception e) {
LOGGER.error("Error preprocessing OSM node " + node.id(), e);
}
// TODO allow limiting node storage to only ones that profile cares about
nodeLocationDb.put(node.id(), node.encodedLocation());
} else if (element instanceof OsmElement.Way way) {
ways++;
try {
profile.preprocessOsmWay(way);
} catch (Exception e) {
LOGGER.error("Error preprocessing OSM way " + way.id(), e);
}
} else if (element instanceof OsmElement.Relation relation) {
relations++;
try {
List<OsmRelationInfo> infos = profile.preprocessOsmRelation(relation);
if (infos != null) {
for (OsmRelationInfo info : infos) {
relationInfo.put(relation.id(), info);
relationInfoSizes.addAndGet(info.estimateMemoryUsageBytes());
for (var member : relation.members()) {
var type = member.type();
// TODO handle nodes in relations and super-relations
if (type == OsmElement.Type.WAY) {
wayToRelations.put(member.ref(), encodeRelationMembership(member.role(), relation.id()));
}
}
}
}
} catch (Exception e) {
LOGGER.error("Error preprocessing OSM relation " + relation.id(), e);
}
} catch (Exception e) {
LOGGER.error("Error preprocessing OSM relation " + rel.getId(), e);
}
// TODO allow limiting multipolygon storage to only ones that profile cares about
if (isMultipolygon(rel)) {
for (ReaderRelation.Member member : rel.getMembers()) {
if (member.getType() == ReaderRelation.Member.WAY) {
waysInMultipolygon.add(member.getRef());
// TODO allow limiting multipolygon storage to only ones that profile cares about
if (isMultipolygon(relation)) {
for (var member : relation.members()) {
if (member.type() == OsmElement.Type.WAY) {
waysInMultipolygon.add(member.ref());
}
}
}
}
}
PASS1_BLOCKS.inc();
PASS1_NODES.incBy(nodes);
PASS1_WAYS.incBy(ways);
PASS1_RELATIONS.incBy(relations);
}
private static boolean isMultipolygon(ReaderRelation relation) {
private static boolean isMultipolygon(OsmElement.Relation relation) {
return relation.hasTag("type", "multipolygon", "boundary", "land_area");
}
@ -226,12 +281,13 @@ public class OsmReader implements Closeable, MemoryEstimator.HasEstimate {
public void pass2(FeatureGroup writer, PlanetilerConfig config) {
var timer = stats.startStage("osm_pass2");
int threads = config.threads();
int readerThreads = Math.max(threads / 4, 1);
int processThreads = threads - (threads >= 4 ? 1 : 0);
int processThreads = Math.max(threads < 4 ? threads : (threads - 1), 1);
Counter.MultiThreadCounter blocksProcessed = Counter.newMultiThreadCounter();
Counter.MultiThreadCounter nodesProcessed = Counter.newMultiThreadCounter();
Counter.MultiThreadCounter waysProcessed = Counter.newMultiThreadCounter();
Counter.MultiThreadCounter relsProcessed = Counter.newMultiThreadCounter();
stats.counter("osm_pass2_elements_processed", "type", () -> Map.of(
"blocks", blocksProcessed,
"nodes", nodesProcessed,
"ways", waysProcessed,
"relations", relsProcessed
@ -239,90 +295,93 @@ public class OsmReader implements Closeable, MemoryEstimator.HasEstimate {
// since multiple threads process OSM elements, and we must process all ways before processing any relations,
// use a count down latch to wait for all threads to finish processing ways
CountDownLatch waysDone = new CountDownLatch(processThreads);
CountDownLatch waitForWays = new CountDownLatch(processThreads);
// Use a Distributor to keep all worker threads busy when processing the final blocks of relations by offloading
// items to threads that are done reading blocks
Distributor<OsmElement.Relation> relationDistributor = Distributor.createWithCapacity(1_000);
String parseThreadPrefix = "pbfpass2";
var pipeline = WorkerPipeline.start("osm_pass2", stats)
.fromGenerator("pbf", osmInputFile.read(parseThreadPrefix, readerThreads))
// TODO should use an adaptive batch size to better utilize lots of cpus:
// - make queue size proportional to cores
// - much larger batches when processing points
// - slightly larger batches when processing ways
// - 1_000 is probably fine for relations
.addBuffer("reader_queue", 50_000, 1_000)
.fromGenerator("read", osmBlockSource::forEachBlock)
.addBuffer("pbf_blocks", 100)
.<SortableFeature>addWorker("process", processThreads, (prev, next) -> {
// avoid contention trying to get the thread-local counters by getting them once when thread starts
Counter blocks = blocksProcessed.counterForThread();
Counter nodes = nodesProcessed.counterForThread();
Counter ways = waysProcessed.counterForThread();
Counter rels = relsProcessed.counterForThread();
var waysDone = false;
var featureCollectors = new FeatureCollector.Factory(config, stats);
NodeLocationProvider nodeLocations = newNodeLocationProvider();
final NodeLocationProvider nodeLocations = newNodeLocationProvider();
FeatureRenderer renderer = createFeatureRenderer(writer, config, next);
for (ReaderElement readerElement : prev) {
SourceFeature feature = null;
if (readerElement instanceof ReaderNode node) {
nodes.inc();
feature = processNodePass2(node);
} else if (readerElement instanceof ReaderWay way) {
ways.inc();
feature = processWayPass2(way, nodeLocations);
} else if (readerElement instanceof ReaderRelation rel) {
// ensure all ways finished processing before we start relations
if (waysDone.getCount() > 0) {
waysDone.countDown();
waysDone.await();
}
rels.inc();
feature = processRelationPass2(rel, nodeLocations);
}
// render features specified by profile and hand them off to next step that will
// write them intermediate storage
var relationHandler = relationDistributor.forThread(relation -> {
var feature = processRelationPass2(relation, nodeLocations);
if (feature != null) {
FeatureCollector features = featureCollectors.get(feature);
try {
profile.processFeature(feature, features);
for (FeatureCollector.Feature renderable : features) {
renderer.accept(renderable);
render(featureCollectors, renderer, relation, feature);
}
rels.inc();
});
for (var block : prev) {
int blockNodes = 0, blockWays = 0;
for (var element : block.decodeElements()) {
SourceFeature feature = null;
if (element instanceof OsmElement.Node node) {
blockNodes++;
feature = processNodePass2(node);
} else if (element instanceof OsmElement.Way way) {
blockWays++;
feature = processWayPass2(way, nodeLocations);
} else if (element instanceof OsmElement.Relation relation) {
// ensure all ways finished processing before we start relations
if (!waysDone && waitForWays.getCount() > 0) {
waitForWays.countDown();
waitForWays.await();
waysDone = true;
}
} catch (Exception e) {
String type = switch (readerElement.getType()) {
case ReaderElement.NODE -> "node";
case ReaderElement.WAY -> "way";
case ReaderElement.RELATION -> "relation";
default -> "element";
};
LOGGER.error("Error processing OSM " + type + " " + readerElement.getId(), e);
relationHandler.accept(relation);
}
// render features specified by profile and hand them off to next step that will
// write them intermediate storage
if (feature != null) {
render(featureCollectors, renderer, element, feature);
}
}
}
blocks.inc();
nodes.incBy(blockNodes);
ways.incBy(blockWays);
}
// just in case a worker skipped over all relations
waysDone.countDown();
waitForWays.countDown();
// do work for other threads that are still processing blocks of relations
relationHandler.close();
}).addBuffer("feature_queue", 50_000, 1_000)
// FeatureGroup writes need to be single-threaded
.sinkToConsumer("write", 1, writer);
var logger = ProgressLoggers.create()
.addRatePercentCounter("nodes", PASS1_NODES.get(), nodesProcessed)
.addRatePercentCounter("nodes", PASS1_NODES.get(), nodesProcessed, true)
.addFileSizeAndRam(nodeLocationDb)
.addRatePercentCounter("ways", PASS1_WAYS.get(), waysProcessed)
.addRatePercentCounter("rels", PASS1_RELATIONS.get(), relsProcessed)
.addRatePercentCounter("ways", PASS1_WAYS.get(), waysProcessed, true)
.addRatePercentCounter("rels", PASS1_RELATIONS.get(), relsProcessed, true)
.addRateCounter("features", writer::numFeaturesWritten)
.addFileSize(writer)
.addRatePercentCounter("blocks", PASS1_BLOCKS.get(), blocksProcessed, false)
.newLine()
.addProcessStats()
.addInMemoryObject("hppc", this)
.newLine()
.addThreadPoolStats("parse", parseThreadPrefix + "-pool")
.addPipelineStats(pipeline);
pipeline.awaitAndLog(logger, config.logInterval());
LOGGER.debug(
"nodes: " + FORMAT.integer(nodesProcessed.get()) +
" ways: " + FORMAT.integer(waysProcessed.get()) +
" relations: " + FORMAT.integer(relsProcessed.get()));
LOGGER.debug("processed " +
"blocks:" + FORMAT.integer(blocksProcessed.get()) +
" nodes:" + FORMAT.integer(nodesProcessed.get()) +
" ways:" + FORMAT.integer(waysProcessed.get()) +
" relations:" + FORMAT.integer(relsProcessed.get()));
timer.stop();
@ -335,6 +394,20 @@ public class OsmReader implements Closeable, MemoryEstimator.HasEstimate {
}
}
private void render(FeatureCollector.Factory featureCollectors, FeatureRenderer renderer, OsmElement element,
SourceFeature feature) {
FeatureCollector features = featureCollectors.get(feature);
try {
profile.processFeature(feature, features);
for (FeatureCollector.Feature renderable : features) {
renderer.accept(renderable);
}
} catch (Exception e) {
String type = element.getClass().getSimpleName();
LOGGER.error("Error processing OSM " + type + " " + element.id(), e);
}
}
private FeatureRenderer createFeatureRenderer(FeatureGroup writer, PlanetilerConfig config,
Consumer<SortableFeature> next) {
var encoder = writer.newRenderedFeatureEncoder();
@ -345,34 +418,34 @@ public class OsmReader implements Closeable, MemoryEstimator.HasEstimate {
);
}
SourceFeature processNodePass2(ReaderNode node) {
SourceFeature processNodePass2(OsmElement.Node node) {
// nodes are simple because they already contain their location
return new NodeSourceFeature(node);
}
SourceFeature processWayPass2(ReaderWay way, NodeLocationProvider nodeLocations) {
SourceFeature processWayPass2(OsmElement.Way way, NodeLocationProvider nodeLocations) {
// ways contain an ordered list of node IDs, so we need to join that with node locations
// from pass1 to reconstruct the geometry.
LongArrayList nodes = way.getNodes();
if (waysInMultipolygon.contains(way.getId())) {
LongArrayList nodes = way.nodes();
if (waysInMultipolygon.contains(way.id())) {
// if this is part of a multipolygon, store the node IDs for this way ID so that when
// we get to the multipolygon we can go from way IDs -> node IDs -> node locations.
synchronized (this) { // multiple threads may update this concurrently
multipolygonWayGeometries.putAll(way.getId(), nodes);
multipolygonWayGeometries.putAll(way.id(), nodes);
}
}
boolean closed = nodes.size() > 1 && nodes.get(0) == nodes.get(nodes.size() - 1);
// area tag used to differentiate between whether a closed way should be treated as a polygon or linestring
String area = way.getTag("area");
List<RelationMember<OsmRelationInfo>> rels = getRelationMembershipForWay(way.getId());
String area = way.getString("area");
List<RelationMember<OsmRelationInfo>> rels = getRelationMembershipForWay(way.id());
return new WaySourceFeature(way, closed, area, nodeLocations, rels);
}
SourceFeature processRelationPass2(ReaderRelation rel, NodeLocationProvider nodeLocations) {
SourceFeature processRelationPass2(OsmElement.Relation rel, NodeLocationProvider nodeLocations) {
// Relation info gets used during way processing, except multipolygons which we have to process after we've
// stored all the node IDs for each way.
if (isMultipolygon(rel)) {
List<RelationMember<OsmRelationInfo>> parentRelations = getRelationMembershipForWay(rel.getId());
List<RelationMember<OsmRelationInfo>> parentRelations = getRelationMembershipForWay(rel.id());
return new MultipolygonSourceFeature(rel, nodeLocations, parentRelations);
} else {
return null;
@ -420,6 +493,7 @@ public class OsmReader implements Closeable, MemoryEstimator.HasEstimate {
nodeLocationDb.close();
roleIds.release();
roleIdsReverse.release();
osmBlockSource.close();
}
NodeLocationProvider newNodeLocationProvider() {
@ -484,9 +558,9 @@ public class OsmReader implements Closeable, MemoryEstimator.HasEstimate {
private Geometry latLonGeom;
private Geometry worldGeom;
public OsmFeature(ReaderElement elem, boolean point, boolean line, boolean polygon,
public OsmFeature(OsmElement elem, boolean point, boolean line, boolean polygon,
List<RelationMember<OsmRelationInfo>> relationInfo) {
super(ReaderElementUtils.getTags(elem), name, null, relationInfo, elem.getId());
super(elem.tags(), name, null, relationInfo, elem.id());
this.point = point;
this.line = line;
this.polygon = polygon;
@ -523,20 +597,18 @@ public class OsmReader implements Closeable, MemoryEstimator.HasEstimate {
/** A {@link Point} created from an OSM node. */
private class NodeSourceFeature extends OsmFeature {
private final double lon;
private final double lat;
private final long encodedLocation;
NodeSourceFeature(ReaderNode node) {
NodeSourceFeature(OsmElement.Node node) {
super(node, true, false, false, null);
this.lon = node.getLon();
this.lat = node.getLat();
this.encodedLocation = node.encodedLocation();
}
@Override
protected Geometry computeWorldGeometry() {
return GeoUtils.point(
GeoUtils.getWorldX(lon),
GeoUtils.getWorldY(lat)
GeoUtils.decodeWorldX(encodedLocation),
GeoUtils.decodeWorldY(encodedLocation)
);
}
@ -573,14 +645,14 @@ public class OsmReader implements Closeable, MemoryEstimator.HasEstimate {
private final NodeLocationProvider nodeLocations;
private final LongArrayList nodeIds;
public WaySourceFeature(ReaderWay way, boolean closed, String area, NodeLocationProvider nodeLocations,
public WaySourceFeature(OsmElement.Way way, boolean closed, String area, NodeLocationProvider nodeLocations,
List<RelationMember<OsmRelationInfo>> relationInfo) {
super(way, false,
OsmReader.canBeLine(closed, area, way.getNodes().size()),
OsmReader.canBePolygon(closed, area, way.getNodes().size()),
OsmReader.canBeLine(closed, area, way.nodes().size()),
OsmReader.canBePolygon(closed, area, way.nodes().size()),
relationInfo
);
this.nodeIds = way.getNodes();
this.nodeIds = way.nodes();
this.nodeLocations = nodeLocations;
}
@ -622,10 +694,10 @@ public class OsmReader implements Closeable, MemoryEstimator.HasEstimate {
*/
private class MultipolygonSourceFeature extends OsmFeature {
private final ReaderRelation relation;
private final OsmElement.Relation relation;
private final NodeLocationProvider nodeLocations;
public MultipolygonSourceFeature(ReaderRelation relation, NodeLocationProvider nodeLocations,
public MultipolygonSourceFeature(OsmElement.Relation relation, NodeLocationProvider nodeLocations,
List<RelationMember<OsmRelationInfo>> parentRelations) {
super(relation, false, false, true, parentRelations);
this.relation = relation;
@ -634,17 +706,17 @@ public class OsmReader implements Closeable, MemoryEstimator.HasEstimate {
@Override
protected Geometry computeWorldGeometry() throws GeometryException {
List<LongArrayList> rings = new ArrayList<>(relation.getMembers().size());
for (ReaderRelation.Member member : relation.getMembers()) {
String role = member.getRole();
LongArrayList poly = multipolygonWayGeometries.get(member.getRef());
if (member.getType() == ReaderRelation.Member.WAY) {
List<LongArrayList> rings = new ArrayList<>(relation.members().size());
for (OsmElement.Relation.Member member : relation.members()) {
String role = member.role();
LongArrayList poly = multipolygonWayGeometries.get(member.ref());
if (member.type() == OsmElement.Type.WAY) {
if (poly != null && !poly.isEmpty()) {
rings.add(poly);
} else if (relation.hasTag("type", "multipolygon")) {
// boundary and land_area relations might not be complete for extracts, but multipolygons should be
LOGGER.warn(
"Missing " + role + " OsmWay[" + member.getRef() + "] for " + relation.getTag("type") + " " + this);
"Missing " + role + " OsmWay[" + member.ref() + "] for " + relation.getTag("type") + " " + this);
}
}
}

Wyświetl plik

@ -1,16 +0,0 @@
package com.onthegomap.planetiler.reader.osm;
import com.graphhopper.reader.ReaderElement;
import com.onthegomap.planetiler.worker.WorkerPipeline;
public interface OsmSource {
/**
* Returns a source that initiates a {@link WorkerPipeline} with raw OSM elements.
*
* @param poolName string ID used when creating worker threads to decode OSM blocks
* @param threads maximum number of threads to use when processing elements in parallel
* @return work for a source thread
*/
WorkerPipeline.SourceStep<ReaderElement> read(String poolName, int threads);
}

Wyświetl plik

@ -0,0 +1,297 @@
// This software is released into the Public Domain.
// See NOTICE.md here or copying.txt from https://github.com/openstreetmap/osmosis/blob/master/package/copying.txt for details.
package com.onthegomap.planetiler.reader.osm;
import com.carrotsearch.hppc.LongArrayList;
import com.google.common.collect.Iterators;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.IntUnaryOperator;
import java.util.zip.DataFormatException;
import java.util.zip.Inflater;
import org.locationtech.jts.geom.Envelope;
import org.openstreetmap.osmosis.osmbinary.Fileformat;
import org.openstreetmap.osmosis.osmbinary.Osmformat;
/**
* Converts PBF block data into decoded entities. This class was adapted from Osmosis to expose an iterator over blocks
* to give more control over the parallelism.
*
* @author Brett Henderson
*/
public class PbfDecoder implements Iterable<OsmElement> {
private final Osmformat.PrimitiveBlock block;
private final PbfFieldDecoder fieldDecoder;
private PbfDecoder(byte[] rawBlob) throws IOException {
byte[] data = readBlobContent(rawBlob);
block = Osmformat.PrimitiveBlock.parseFrom(data);
fieldDecoder = new PbfFieldDecoder(block);
}
private static byte[] readBlobContent(byte[] input) throws IOException {
Fileformat.Blob blob = Fileformat.Blob.parseFrom(input);
byte[] blobData;
if (blob.hasRaw()) {
blobData = blob.getRaw().toByteArray();
} else if (blob.hasZlibData()) {
Inflater inflater = new Inflater();
inflater.setInput(blob.getZlibData().toByteArray());
blobData = new byte[blob.getRawSize()];
try {
inflater.inflate(blobData);
} catch (DataFormatException e) {
throw new RuntimeException("Unable to decompress PBF blob.", e);
}
if (!inflater.finished()) {
throw new RuntimeException("PBF blob contains incomplete compressed data.");
}
inflater.end();
} else {
throw new RuntimeException("PBF blob uses unsupported compression, only raw or zlib may be used.");
}
return blobData;
}
/** Decompresses and parses a block of primitive OSM elements. */
public static Iterable<OsmElement> decode(byte[] raw) {
try {
return new PbfDecoder(raw);
} catch (IOException e) {
throw new UncheckedIOException("Unable to process PBF blob", e);
}
}
/** Decompresses and parses a header block of an OSM input file. */
public static OsmHeader decodeHeader(byte[] raw) {
try {
byte[] data = readBlobContent(raw);
Osmformat.HeaderBlock header = Osmformat.HeaderBlock.parseFrom(data);
Osmformat.HeaderBBox bbox = header.getBbox();
Envelope bounds = new Envelope(
bbox.getLeft() / 1e9,
bbox.getRight() / 1e9,
bbox.getBottom() / 1e9,
bbox.getTop() / 1e9
);
return new OsmHeader(
bounds,
header.getRequiredFeaturesList(),
header.getOptionalFeaturesList(),
header.getWritingprogram(),
header.getSource(),
Instant.ofEpochSecond(header.getOsmosisReplicationTimestamp()),
header.getOsmosisReplicationSequenceNumber(),
header.getOsmosisReplicationBaseUrl()
);
} catch (IOException e) {
throw new UncheckedIOException("Unable to decode PBF header", e);
}
}
@Override
public Iterator<OsmElement> iterator() {
return Iterators.concat(block.getPrimitivegroupList().stream().map(primitiveGroup -> Iterators.concat(
new DenseNodeIterator(primitiveGroup.getDense()),
new NodeIterator(primitiveGroup.getNodesList()),
new WayIterator(primitiveGroup.getWaysList()),
new RelationIterator(primitiveGroup.getRelationsList())
)).iterator());
}
private Map<String, Object> buildTags(int num, IntUnaryOperator key, IntUnaryOperator value) {
if (num > 0) {
Map<String, Object> tags = new HashMap<>(num);
for (int i = 0; i < num; i++) {
String k = fieldDecoder.decodeString(key.applyAsInt(i));
String v = fieldDecoder.decodeString(value.applyAsInt(i));
tags.put(k, v);
}
return tags;
}
return Collections.emptyMap();
}
private class NodeIterator implements Iterator<OsmElement.Node> {
private final List<Osmformat.Node> nodes;
int i;
public NodeIterator(List<Osmformat.Node> nodes) {
this.nodes = nodes;
i = 0;
}
@Override
public boolean hasNext() {
return i < nodes.size();
}
@Override
public OsmElement.Node next() {
var node = nodes.get(i++);
return new OsmElement.Node(
node.getId(),
buildTags(node.getKeysCount(), node::getKeys, node::getVals),
fieldDecoder.decodeLatitude(node.getLat()),
fieldDecoder.decodeLongitude(node.getLon())
);
}
}
private class RelationIterator implements Iterator<OsmElement.Relation> {
private final List<Osmformat.Relation> relations;
int i;
public RelationIterator(List<Osmformat.Relation> relations) {
this.relations = relations;
i = 0;
}
@Override
public boolean hasNext() {
return i < relations.size();
}
@Override
public OsmElement.Relation next() {
var relation = relations.get(i++);
int num = relation.getMemidsCount();
List<OsmElement.Relation.Member> members = new ArrayList<>(num);
long memberId = 0;
for (int i = 0; i < num; i++) {
memberId += relation.getMemids(i);
var memberType = switch (relation.getTypes(i)) {
case WAY -> OsmElement.Relation.Type.WAY;
case NODE -> OsmElement.Relation.Type.NODE;
case RELATION -> OsmElement.Relation.Type.RELATION;
};
members.add(new OsmElement.Relation.Member(
memberType,
memberId,
fieldDecoder.decodeString(relation.getRolesSid(i))
));
}
// Add the bound object to the results.
return new OsmElement.Relation(
relation.getId(),
buildTags(relation.getKeysCount(), relation::getKeys, relation::getVals),
members
);
}
}
private class WayIterator implements Iterator<OsmElement.Way> {
private final List<Osmformat.Way> ways;
int i;
public WayIterator(List<Osmformat.Way> ways) {
this.ways = ways;
i = 0;
}
@Override
public boolean hasNext() {
return i < ways.size();
}
@Override
public OsmElement.Way next() {
var way = ways.get(i++);
// Build up the list of way nodes for the way. The node ids are
// delta encoded meaning that each id is stored as a delta against
// the previous one.
long nodeId = 0;
int numNodes = way.getRefsCount();
LongArrayList wayNodesList = new LongArrayList(numNodes);
wayNodesList.elementsCount = numNodes;
long[] wayNodes = wayNodesList.buffer;
for (int i = 0; i < numNodes; i++) {
long nodeIdOffset = way.getRefs(i);
nodeId += nodeIdOffset;
wayNodes[i] = nodeId;
}
return new OsmElement.Way(
way.getId(),
buildTags(way.getKeysCount(), way::getKeys, way::getVals),
wayNodesList
);
}
}
private class DenseNodeIterator implements Iterator<OsmElement.Node> {
final Osmformat.DenseNodes nodes;
long nodeId;
long latitude;
long longitude;
int i;
int kvIndex;
public DenseNodeIterator(Osmformat.DenseNodes nodes) {
this.nodes = nodes;
nodeId = 0;
latitude = 0;
longitude = 0;
i = 0;
kvIndex = 0;
}
@Override
public boolean hasNext() {
return i < nodes.getIdCount();
}
@Override
public OsmElement.Node next() {
// Delta decode node fields.
nodeId += nodes.getId(i);
latitude += nodes.getLat(i);
longitude += nodes.getLon(i);
i++;
// Build the tags. The key and value string indexes are sequential
// in the same PBF array. Each set of tags is delimited by an index
// with a value of 0.
Map<String, Object> tags = null;
while (kvIndex < nodes.getKeysValsCount()) {
int keyIndex = nodes.getKeysVals(kvIndex++);
if (keyIndex == 0) {
break;
}
int valueIndex = nodes.getKeysVals(kvIndex++);
if (tags == null) {
// divide by 2 as key&value, multiple by 2 because of the better approximation
tags = new HashMap<>(Math.max(3, 2 * (nodes.getKeysValsCount() / 2) / nodes.getKeysValsCount()));
}
tags.put(fieldDecoder.decodeString(keyIndex), fieldDecoder.decodeString(valueIndex));
}
return new OsmElement.Node(
nodeId,
tags == null ? Collections.emptyMap() : tags,
((double) latitude) / 10000000,
((double) longitude) / 10000000
);
}
}
}

Wyświetl plik

@ -0,0 +1,87 @@
// This software is released into the Public Domain.
// See NOTICE.md here or copying.txt from https://github.com/openstreetmap/osmosis/blob/master/package/copying.txt for details.
package com.onthegomap.planetiler.reader.osm;
import java.util.Date;
import org.openstreetmap.osmosis.osmbinary.Osmformat;
/**
* Manages decoding of the lower level PBF data structures.
* <p>
* This class is copied from Osmosis.
*
* @author Brett Henderson
* <p>
*/
public class PbfFieldDecoder {
private static final double COORDINATE_SCALING_FACTOR = 0.000000001;
private final String[] strings;
private final int coordGranularity;
private final long coordLatitudeOffset;
private final long coordLongitudeOffset;
private final int dateGranularity;
/**
* Creates a new instance.
* <p>
*
* @param primitiveBlock The primitive block containing the fields to be decoded.
*/
public PbfFieldDecoder(Osmformat.PrimitiveBlock primitiveBlock) {
this.coordGranularity = primitiveBlock.getGranularity();
this.coordLatitudeOffset = primitiveBlock.getLatOffset();
this.coordLongitudeOffset = primitiveBlock.getLonOffset();
this.dateGranularity = primitiveBlock.getDateGranularity();
Osmformat.StringTable stringTable = primitiveBlock.getStringtable();
strings = new String[stringTable.getSCount()];
for (int i = 0; i < strings.length; i++) {
strings[i] = stringTable.getS(i).toStringUtf8();
}
}
/**
* Decodes a raw latitude value into degrees.
* <p>
*
* @param rawLatitude The PBF encoded value.
* @return The latitude in degrees.
*/
public double decodeLatitude(long rawLatitude) {
return COORDINATE_SCALING_FACTOR * (coordLatitudeOffset + (coordGranularity * rawLatitude));
}
/**
* Decodes a raw longitude value into degrees.
* <p>
*
* @param rawLongitude The PBF encoded value.
* @return The longitude in degrees.
*/
public double decodeLongitude(long rawLongitude) {
return COORDINATE_SCALING_FACTOR * (coordLongitudeOffset + (coordGranularity * rawLongitude));
}
/**
* Decodes a raw timestamp value into a Date.
* <p>
*
* @param rawTimestamp The PBF encoded timestamp.
* @return The timestamp as a Date.
*/
public Date decodeTimestamp(long rawTimestamp) {
return new Date(dateGranularity * rawTimestamp);
}
/**
* Decodes a raw string into a String.
* <p>
*
* @param rawString The PBF encoding string.
* @return The string as a String.
*/
public String decodeString(int rawString) {
return strings[rawString];
}
}

Wyświetl plik

@ -38,7 +38,8 @@ public record ProcessTime(Duration wall, Optional<Duration> cpu, Duration gc) {
Optional<String> deltaCpu = cpu.map(format::duration);
String avgCpus = cpu.map(cpuTime -> " avg:" + format.decimal(cpuTime.toNanos() * 1d / wall.toNanos()))
.orElse("");
return format.duration(wall) + " cpu:" + deltaCpu.orElse("-") + " gc:" + format.duration(gc) + avgCpus;
String gcString = gc.compareTo(Duration.ofSeconds(1)) > 0 ? (" gc:" + format.duration(gc)) : "";
return format.duration(wall) + " cpu:" + deltaCpu.orElse("-") + gcString + avgCpus;
}
@Override

Wyświetl plik

@ -124,24 +124,24 @@ public class ProgressLoggers {
* Adds "name: [ numCompleted pctComplete% rate/s ]" to the logger where {@code total} is the total number of items to
* process.
*/
public ProgressLoggers addRatePercentCounter(String name, long total, AtomicLong value) {
return addRatePercentCounter(name, total, value::get);
public ProgressLoggers addRatePercentCounter(String name, long total, AtomicLong value, boolean color) {
return addRatePercentCounter(name, total, value::get, color);
}
/**
* Adds "name: [ numCompleted pctComplete% rate/s ]" to the logger where {@code total} is the total number of items to
* process.
*/
public ProgressLoggers addRatePercentCounter(String name, long total, LongSupplier getValue) {
return addRatePercentCounter(name, total, getValue, n -> format.numeric(n, true));
public ProgressLoggers addRatePercentCounter(String name, long total, LongSupplier getValue, boolean color) {
return addRatePercentCounter(name, total, getValue, n -> format.numeric(n, true), color);
}
/**
* Adds "name: [ numCompleted pctComplete% rate/s ]" to the logger where {@code total} is the total number of bytes to
* process.
*/
public ProgressLoggers addStorageRatePercentCounter(String name, long total, LongSupplier getValue) {
return addRatePercentCounter(name, total, getValue, n -> format.storage(n, true));
public ProgressLoggers addStorageRatePercentCounter(String name, long total, LongSupplier getValue, boolean color) {
return addRatePercentCounter(name, total, getValue, n -> format.storage(n, true), color);
}
/**
@ -149,10 +149,10 @@ public class ProgressLoggers {
* process.
*/
public ProgressLoggers addRatePercentCounter(String name, long total, LongSupplier getValue,
Function<Number, String> formatter) {
Function<Number, String> formatter, boolean color) {
// if there's no total, we can't show progress so fall back to rate logger instead
if (total == 0) {
return addRateCounter(name, getValue, true);
return addRateCounter(name, getValue, color);
}
AtomicLong last = new AtomicLong(getValue.getAsLong());
AtomicLong lastTime = new AtomicLong(System.nanoTime());
@ -169,7 +169,7 @@ public class ProgressLoggers {
String result =
"[ " + formatter.apply(valueNow) + " " + padLeft(format.percent(1f * valueNow / total), 4)
+ " " + formatter.apply(valueDiff / timeDiff) + "/s ]";
return valueDiff > 0 ? green(result) : result;
return (color && valueDiff > 0) ? green(result) : result;
}));
return this;
}
@ -366,6 +366,7 @@ public class ProgressLoggers {
while (!await(future, logInterval)) {
log();
}
log();
}
/** Returns true if the future is done, false if {@code duration} has elapsed. */

Wyświetl plik

@ -30,8 +30,10 @@ public class Timers {
String name = entry.getKey();
var elapsed = entry.getValue().timer.elapsed();
LOGGER.info("\t" + Format.padRight(name, maxLength) + " " + elapsed);
for (String detail : getStageDetails(name, true)) {
LOGGER.info("\t " + detail);
if (elapsed.wall().compareTo(Duration.ofSeconds(1)) > 0) {
for (String detail : getStageDetails(name, true)) {
LOGGER.info("\t " + detail);
}
}
}
}
@ -59,7 +61,7 @@ public class Timers {
.append(Format.padLeft(Integer.toString(num), 2))
.append("x(")
.append(FORMAT.percent(sum.cpuTime().toNanos() / totalNanos))
.append(" cpu:")
.append(" ")
.append(FORMAT.duration(sum.cpuTime().dividedBy(num)));
Duration systemTime = sum.cpuTime().minus(sum.userTime()).dividedBy(num);

Wyświetl plik

@ -168,7 +168,7 @@ public class Downloader {
for (var toDownload : toDownloadList) {
try {
long size = toDownload.metadata.get(10, TimeUnit.SECONDS).size;
loggers.addStorageRatePercentCounter(toDownload.id, size, toDownload::bytesDownloaded);
loggers.addStorageRatePercentCounter(toDownload.id, size, toDownload::bytesDownloaded, true);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new IllegalStateException("Error getting size of " + toDownload.url, e);
}

Wyświetl plik

@ -14,10 +14,10 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.graphhopper.coll.GHLongObjectHashMap;
import com.graphhopper.reader.ReaderElement;
import com.graphhopper.util.StopWatch;
import com.onthegomap.planetiler.Profile;
import com.onthegomap.planetiler.config.PlanetilerConfig;
import com.onthegomap.planetiler.reader.osm.OsmBlockSource;
import com.onthegomap.planetiler.reader.osm.OsmElement;
import com.onthegomap.planetiler.reader.osm.OsmInputFile;
import com.onthegomap.planetiler.stats.Counter;
@ -61,6 +61,7 @@ public class Wikidata {
private static final Logger LOGGER = LoggerFactory.getLogger(Wikidata.class);
private static final Pattern wikidataIRIMatcher = Pattern.compile("http://www.wikidata.org/entity/Q([0-9]+)");
private static final Pattern qidPattern = Pattern.compile("Q([0-9]+)");
private final Counter.Readable blocks = Counter.newMultiThreadCounter();
private final Counter.Readable nodes = Counter.newMultiThreadCounter();
private final Counter.Readable ways = Counter.newMultiThreadCounter();
private final Counter.Readable rels = Counter.newMultiThreadCounter();
@ -111,22 +112,22 @@ public class Wikidata {
*/
public static void fetch(OsmInputFile infile, Path outfile, PlanetilerConfig config, Profile profile, Stats stats) {
var timer = stats.startStage("wikidata");
int threadsAvailable = Math.max(1, config.threads() - 2);
int processThreads = Math.max(1, threadsAvailable / 2);
int readerThreads = Math.max(1, threadsAvailable - processThreads);
LOGGER
.info("Starting with " + readerThreads + " reader threads and " + processThreads + " process threads");
int processThreads = Math.max(1, config.threads() - 1);
LOGGER.info("Starting with " + processThreads + " process threads");
WikidataTranslations oldMappings = load(outfile);
try (Writer writer = Files.newBufferedWriter(outfile)) {
try (
Writer writer = Files.newBufferedWriter(outfile);
OsmBlockSource osmSource = infile.get()
) {
HttpClient client = HttpClient.newBuilder().connectTimeout(config.httpTimeout()).build();
Wikidata fetcher = new Wikidata(writer, Client.wrap(client), 5_000, profile, config);
fetcher.loadExisting(oldMappings);
String pbfParsePrefix = "pbfwikidata";
var pipeline = WorkerPipeline.start("wikidata", stats)
.fromGenerator("pbf", infile.read(pbfParsePrefix, readerThreads))
.addBuffer("reader_queue", 50_000, 10_000)
.fromGenerator("pbf", osmSource::forEachBlock)
.addBuffer("pbf_blocks", processThreads * 2)
.addWorker("filter", processThreads, fetcher::filter)
.addBuffer("fetch_queue", 1_000_000, 100)
.sinkTo("fetch", 1, prev -> {
@ -137,6 +138,7 @@ public class Wikidata {
});
ProgressLoggers loggers = ProgressLoggers.create()
.addRateCounter("blocks", fetcher.blocks)
.addRateCounter("nodes", fetcher.nodes, true)
.addRateCounter("ways", fetcher.ways, true)
.addRateCounter("rels", fetcher.rels, true)
@ -215,23 +217,31 @@ public class Wikidata {
}
/** Only pass elements that the profile cares about to next step in pipeline. */
private void filter(Iterable<ReaderElement> prev, Consumer<Long> next) {
for (ReaderElement elem : prev) {
switch (elem.getType()) {
case ReaderElement.NODE -> nodes.inc();
case ReaderElement.WAY -> ways.inc();
case ReaderElement.RELATION -> rels.inc();
}
Object wikidata = elem.getTag("wikidata");
if (wikidata instanceof String wikidataString) {
OsmElement osmElement = OsmElement.fromGraphhopper(elem);
if (profile.caresAboutWikidataTranslation(osmElement)) {
long qid = parseQid(wikidataString);
if (qid > 0) {
next.accept(qid);
private void filter(Iterable<OsmBlockSource.Block> prev, Consumer<Long> next) {
for (var block : prev) {
int blockNodes = 0, blockWays = 0, blockRelations = 0;
for (var elem : block.decodeElements()) {
if (elem instanceof OsmElement.Node) {
blockNodes++;
} else if (elem instanceof OsmElement.Way) {
blockWays++;
} else if (elem instanceof OsmElement.Relation) {
blockRelations++;
}
Object wikidata = elem.getString("wikidata");
if (wikidata instanceof String wikidataString) {
if (profile.caresAboutWikidataTranslation(elem)) {
long qid = parseQid(wikidataString);
if (qid > 0) {
next.accept(qid);
}
}
}
}
blocks.inc();
nodes.incBy(blockNodes);
ways.incBy(blockWays);
rels.incBy(blockRelations);
}
}

Wyświetl plik

@ -0,0 +1,114 @@
package com.onthegomap.planetiler.worker;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import javax.annotation.concurrent.NotThreadSafe;
import javax.annotation.concurrent.ThreadSafe;
/**
* Redistributes work among worker threads when some finish early.
* <p>
* When a group of worker threads are processing large blocks, some may finish early, resulting in idle time at the end
* waiting for the "long pole in the tent" to finish:
*
* <pre>{@code
* busy idle | done
* worker1: ===========>xxxxxx|
* worker2: ===============>xx|
* worker3: =================>|
* worker4: =============>xxxx|
* }</pre>
* <p>
* This utility wraps the operation to perform on each element and then works through items in 3 phases:
*
* <ol>
* <li>If all threads are still busy, process it in the same thread</li>
* <li>If some threads are done, enqueue the item onto a work queue (but if it is full, just process it in the same thread)</li>
* <li>When the thread is done processing input elements, then process items off of the work queue until it is empty and all other workers are finished</li>
* </ol>
*
* @param <T> The type of element being processed
*/
@ThreadSafe
public class Distributor<T> {
private final AtomicInteger done = new AtomicInteger();
private final AtomicInteger working = new AtomicInteger();
private final ArrayBlockingQueue<T> pending;
private Distributor(int capacity) {
pending = new ArrayBlockingQueue<>(capacity);
}
/** Returns a new {@code Distributor} that can hold up to {@code capacity} pending elements. */
public static <T> Distributor<T> createWithCapacity(int capacity) {
return new Distributor<>(capacity);
}
/** A handle for each worker thread to offer new items, and drain the remaining ones when done. */
@NotThreadSafe
public interface ForThread<T> extends Consumer<T>, AutoCloseable {
void finish();
void drain();
@Override
void close();
}
public ForThread<T> forThread(Consumer<T> consumer) {
working.incrementAndGet();
return new ForThread<>() {
boolean finished = false;
@Override
public void accept(T t) {
if (finished) {
throw new IllegalStateException("Finished");
}
if (done.get() == 0 || !pending.offer(t)) {
consumer.accept(t);
}
}
@Override
public void finish() {
if (!finished) {
done.incrementAndGet();
working.decrementAndGet();
finished = true;
}
}
@Override
public void drain() {
T item;
while ((item = pending.poll()) != null || working.get() > 0) {
if (item == null) {
try {
item = pending.poll(100, TimeUnit.MILLISECONDS);
if (item == null && working.get() <= 0) {
break;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
if (item != null) {
consumer.accept(item);
}
}
}
@Override
public void close() {
finish();
drain();
}
};
}
}

Wyświetl plik

@ -0,0 +1,107 @@
package com.onthegomap.planetiler.worker;
import com.onthegomap.planetiler.collection.IterableOnce;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* A high-performance blocking queue to hand off work from a single producing thread to single consuming thread.
* <p>
* Each element has a weight and each batch has a target maximum weight to put more lightweight objects in a batch or
* fewer heavy-weight ones.
*
* @param <T> the type of elements held in this queue
*/
public class WeightedHandoffQueue<T> implements AutoCloseable, IterableOnce<T> {
private final Queue<T> DONE = new ArrayDeque<>(0);
private final BlockingQueue<Queue<T>> itemQueue;
private final int writeLimit;
private boolean done = false;
private int writeCost = 0;
Queue<T> writeBatch = null;
Queue<T> readBatch = null;
/**
* Creates a new {@code WeightedHandoffQueue} with {@code outer} maximum number of pending batches and {@code inner}
* maximum batch weight.
*/
public WeightedHandoffQueue(int outer, int inner) {
this.writeLimit = inner;
itemQueue = new ArrayBlockingQueue<>(outer);
}
@Override
public void close() {
try {
flushWrites();
itemQueue.put(DONE);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public void accept(T item, int cost) {
if (writeBatch == null) {
writeBatch = new ArrayDeque<>(writeLimit / 2);
}
writeCost += cost;
writeBatch.offer(item);
if (writeCost >= writeLimit) {
flushWrites();
}
}
private void flushWrites() {
if (writeBatch != null && !writeBatch.isEmpty()) {
try {
Queue<T> oldWriteBatch = writeBatch;
writeBatch = null;
writeCost = 0;
itemQueue.put(oldWriteBatch);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
}
@Override
public T get() {
Queue<T> itemBatch = readBatch;
if (itemBatch == null || itemBatch.isEmpty()) {
do {
if (done && itemQueue.isEmpty()) {
break;
}
if ((itemBatch = itemQueue.poll()) == null) {
try {
itemBatch = itemQueue.poll(100, TimeUnit.MILLISECONDS);
if (itemBatch != null) {
if (itemBatch == DONE) {
done = true;
}
break;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;// signal EOF
}
} else if (itemBatch == DONE) {
done = true;
}
} while (itemBatch == null);
readBatch = itemBatch;
}
return itemBatch == null ? null : itemBatch.poll();
}
}

Wyświetl plik

@ -6,10 +6,6 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import com.graphhopper.reader.ReaderElement;
import com.graphhopper.reader.ReaderNode;
import com.graphhopper.reader.ReaderRelation;
import com.graphhopper.reader.ReaderWay;
import com.onthegomap.planetiler.collection.FeatureGroup;
import com.onthegomap.planetiler.collection.LongLongMap;
import com.onthegomap.planetiler.config.Arguments;
@ -23,10 +19,10 @@ import com.onthegomap.planetiler.mbtiles.MbtilesWriter;
import com.onthegomap.planetiler.reader.SimpleFeature;
import com.onthegomap.planetiler.reader.SimpleReader;
import com.onthegomap.planetiler.reader.SourceFeature;
import com.onthegomap.planetiler.reader.osm.OsmBlockSource;
import com.onthegomap.planetiler.reader.osm.OsmElement;
import com.onthegomap.planetiler.reader.osm.OsmReader;
import com.onthegomap.planetiler.reader.osm.OsmRelationInfo;
import com.onthegomap.planetiler.reader.osm.OsmSource;
import com.onthegomap.planetiler.stats.Stats;
import com.onthegomap.planetiler.worker.WorkerPipeline;
import java.io.IOException;
@ -77,7 +73,7 @@ public class PlanetilerTests {
private static final int Z4_TILES = 1 << 4;
private final Stats stats = Stats.inMemory();
private static <T extends ReaderElement> T with(T elem, Consumer<T> fn) {
private static <T extends OsmElement> T with(T elem, Consumer<T> fn) {
fn.accept(elem);
return elem;
}
@ -103,16 +99,16 @@ public class PlanetilerTests {
}
private void processOsmFeatures(FeatureGroup featureGroup, Profile profile, PlanetilerConfig config,
List<? extends ReaderElement> osmElements) throws IOException {
OsmSource elems = (name, threads) -> next -> {
List<? extends OsmElement> osmElements) throws IOException {
OsmBlockSource elems = next -> {
// process the same order they come in from an OSM file
osmElements.stream().filter(e -> e.getType() == ReaderElement.FILEHEADER).forEachOrdered(next);
osmElements.stream().filter(e -> e.getType() == ReaderElement.NODE).forEachOrdered(next);
osmElements.stream().filter(e -> e.getType() == ReaderElement.WAY).forEachOrdered(next);
osmElements.stream().filter(e -> e.getType() == ReaderElement.RELATION).forEachOrdered(next);
next.accept(OsmBlockSource.Block.of(osmElements.stream().filter(e -> e instanceof OsmElement.Other).toList()));
next.accept(OsmBlockSource.Block.of(osmElements.stream().filter(e -> e instanceof OsmElement.Node).toList()));
next.accept(OsmBlockSource.Block.of(osmElements.stream().filter(e -> e instanceof OsmElement.Way).toList()));
next.accept(OsmBlockSource.Block.of(osmElements.stream().filter(e -> e instanceof OsmElement.Relation).toList()));
};
var nodeMap = LongLongMap.newInMemorySortedTable();
try (var reader = new OsmReader("osm", elems, nodeMap, profile, Stats.inMemory())) {
try (var reader = new OsmReader("osm", () -> elems, nodeMap, profile, Stats.inMemory())) {
reader.pass1(config);
reader.pass2(featureGroup, config);
}
@ -175,7 +171,7 @@ public class PlanetilerTests {
private PlanetilerResults runWithOsmElements(
Map<String, String> args,
List<ReaderElement> features,
List<OsmElement> features,
BiConsumer<SourceFeature, FeatureCollector> profileFunction
) throws Exception {
return run(
@ -187,7 +183,7 @@ public class PlanetilerTests {
private PlanetilerResults runWithOsmElements(
Map<String, String> args,
List<ReaderElement> features,
List<OsmElement> features,
Profile profileToUse
) throws Exception {
return run(
@ -199,7 +195,7 @@ public class PlanetilerTests {
private PlanetilerResults runWithOsmElements(
Map<String, String> args,
List<ReaderElement> features,
List<OsmElement> features,
Function<OsmElement.Relation, List<OsmRelationInfo>> preprocessOsmRelation,
BiConsumer<SourceFeature, FeatureCollector> profileFunction
) throws Exception {
@ -745,7 +741,7 @@ public class PlanetilerTests {
var results = runWithOsmElements(
Map.of("threads", "1"),
List.of(
with(new ReaderNode(1, 0, 0), t -> t.setTag("attr", "value"))
with(new OsmElement.Node(1, 0, 0), t -> t.setTag("attr", "value"))
),
(in, features) -> {
if (in.isPoint()) {
@ -772,18 +768,20 @@ public class PlanetilerTests {
var results = run(
Map.of("threads", "1"),
(featureGroup, profile, config) -> {
List<? extends ReaderElement> osmElements = List.<ReaderElement>of(
with(new ReaderNode(1, 0, 0), t -> t.setTag("attr", "value"))
List<? extends OsmElement> osmElements = List.<OsmElement>of(
with(new OsmElement.Node(1, 0, 0), t -> t.setTag("attr", "value"))
);
OsmSource elems = (name, threads) -> next -> {
OsmBlockSource elems = next -> {
// process the same order they come in from an OSM file
osmElements.stream().filter(e -> e.getType() == ReaderElement.FILEHEADER).forEachOrdered(next);
osmElements.stream().filter(e -> e.getType() == ReaderElement.NODE).forEachOrdered(next);
osmElements.stream().filter(e -> e.getType() == ReaderElement.WAY).forEachOrdered(next);
osmElements.stream().filter(e -> e.getType() == ReaderElement.RELATION).forEachOrdered(next);
next.accept(
OsmBlockSource.Block.of(osmElements.stream().filter(e -> e instanceof OsmElement.Other).toList()));
next.accept(OsmBlockSource.Block.of(osmElements.stream().filter(e -> e instanceof OsmElement.Node).toList()));
next.accept(OsmBlockSource.Block.of(osmElements.stream().filter(e -> e instanceof OsmElement.Way).toList()));
next.accept(
OsmBlockSource.Block.of(osmElements.stream().filter(e -> e instanceof OsmElement.Relation).toList()));
};
var nodeMap = LongLongMap.newInMemorySortedTable();
try (var reader = new OsmReader("osm", elems, nodeMap, profile, Stats.inMemory())) {
try (var reader = new OsmReader("osm", () -> elems, nodeMap, profile, Stats.inMemory())) {
// skip pass 1
reader.pass2(featureGroup, config);
}
@ -813,7 +811,7 @@ public class PlanetilerTests {
assertThrows(RuntimeException.class, () -> runWithOsmElements(
Map.of("threads", "1"),
List.of(
with(new ReaderNode(1, 0, 0), t -> t.setTag("attr", "value"))
with(new OsmElement.Node(1, 0, 0), t -> t.setTag("attr", "value"))
),
(in, features) -> {
throw new Error();
@ -826,11 +824,11 @@ public class PlanetilerTests {
var results = runWithOsmElements(
Map.of("threads", "1"),
List.of(
new ReaderNode(1, 0, 0),
new ReaderNode(2, GeoUtils.getWorldLat(0.75), GeoUtils.getWorldLon(0.75)),
with(new ReaderWay(3), way -> {
new OsmElement.Node(1, 0, 0),
new OsmElement.Node(2, GeoUtils.getWorldLat(0.75), GeoUtils.getWorldLon(0.75)),
with(new OsmElement.Way(3), way -> {
way.setTag("attr", "value");
way.getNodes().add(1, 2);
way.nodes().add(1, 2);
})
),
(in, features) -> {
@ -858,13 +856,13 @@ public class PlanetilerTests {
var results = runWithOsmElements(
Map.of("threads", "1"),
List.of(
new ReaderNode(1, GeoUtils.getWorldLat(0.25), GeoUtils.getWorldLon(0.25)),
new ReaderNode(2, GeoUtils.getWorldLat(0.25), GeoUtils.getWorldLon(0.75)),
new ReaderNode(3, GeoUtils.getWorldLat(0.75), GeoUtils.getWorldLon(0.75)),
new ReaderNode(4, GeoUtils.getWorldLat(0.75), GeoUtils.getWorldLon(0.25)),
with(new ReaderWay(6), way -> {
new OsmElement.Node(1, GeoUtils.getWorldLat(0.25), GeoUtils.getWorldLon(0.25)),
new OsmElement.Node(2, GeoUtils.getWorldLat(0.25), GeoUtils.getWorldLon(0.75)),
new OsmElement.Node(3, GeoUtils.getWorldLat(0.75), GeoUtils.getWorldLon(0.75)),
new OsmElement.Node(4, GeoUtils.getWorldLat(0.75), GeoUtils.getWorldLon(0.25)),
with(new OsmElement.Way(6), way -> {
way.setTag("attr", "value");
way.getNodes().add(1, 2, 3, 4, 1);
way.nodes().add(1, 2, 3, 4, 1);
})
),
(in, features) -> {
@ -910,38 +908,38 @@ public class PlanetilerTests {
var results = runWithOsmElements(
Map.of("threads", "1"),
List.of(
new ReaderNode(1, GeoUtils.getWorldLat(0.125), GeoUtils.getWorldLon(0.125)),
new ReaderNode(2, GeoUtils.getWorldLat(0.125), GeoUtils.getWorldLon(0.875)),
new ReaderNode(3, GeoUtils.getWorldLat(0.875), GeoUtils.getWorldLon(0.875)),
new ReaderNode(4, GeoUtils.getWorldLat(0.875), GeoUtils.getWorldLon(0.125)),
new OsmElement.Node(1, GeoUtils.getWorldLat(0.125), GeoUtils.getWorldLon(0.125)),
new OsmElement.Node(2, GeoUtils.getWorldLat(0.125), GeoUtils.getWorldLon(0.875)),
new OsmElement.Node(3, GeoUtils.getWorldLat(0.875), GeoUtils.getWorldLon(0.875)),
new OsmElement.Node(4, GeoUtils.getWorldLat(0.875), GeoUtils.getWorldLon(0.125)),
new ReaderNode(5, GeoUtils.getWorldLat(0.25), GeoUtils.getWorldLon(0.25)),
new ReaderNode(6, GeoUtils.getWorldLat(0.25), GeoUtils.getWorldLon(0.75)),
new ReaderNode(7, GeoUtils.getWorldLat(0.75), GeoUtils.getWorldLon(0.75)),
new ReaderNode(8, GeoUtils.getWorldLat(0.75), GeoUtils.getWorldLon(0.25)),
new OsmElement.Node(5, GeoUtils.getWorldLat(0.25), GeoUtils.getWorldLon(0.25)),
new OsmElement.Node(6, GeoUtils.getWorldLat(0.25), GeoUtils.getWorldLon(0.75)),
new OsmElement.Node(7, GeoUtils.getWorldLat(0.75), GeoUtils.getWorldLon(0.75)),
new OsmElement.Node(8, GeoUtils.getWorldLat(0.75), GeoUtils.getWorldLon(0.25)),
new ReaderNode(9, GeoUtils.getWorldLat(0.375), GeoUtils.getWorldLon(0.375)),
new ReaderNode(10, GeoUtils.getWorldLat(0.375), GeoUtils.getWorldLon(0.625)),
new ReaderNode(11, GeoUtils.getWorldLat(0.625), GeoUtils.getWorldLon(0.625)),
new ReaderNode(12, GeoUtils.getWorldLat(0.625), GeoUtils.getWorldLon(0.375)),
new ReaderNode(13, GeoUtils.getWorldLat(0.375 + 1e-12), GeoUtils.getWorldLon(0.375)),
new OsmElement.Node(9, GeoUtils.getWorldLat(0.375), GeoUtils.getWorldLon(0.375)),
new OsmElement.Node(10, GeoUtils.getWorldLat(0.375), GeoUtils.getWorldLon(0.625)),
new OsmElement.Node(11, GeoUtils.getWorldLat(0.625), GeoUtils.getWorldLon(0.625)),
new OsmElement.Node(12, GeoUtils.getWorldLat(0.625), GeoUtils.getWorldLon(0.375)),
new OsmElement.Node(13, GeoUtils.getWorldLat(0.375 + 1e-12), GeoUtils.getWorldLon(0.375)),
with(new ReaderWay(14), way -> way.getNodes().add(1, 2, 3, 4, 1)),
with(new ReaderWay(15), way -> way.getNodes().add(5, 6, 7, 8, 5)),
with(new ReaderWay(16), way -> way.getNodes().add(9, 10, 11, 12, 13)),
with(new OsmElement.Way(14), way -> way.nodes().add(1, 2, 3, 4, 1)),
with(new OsmElement.Way(15), way -> way.nodes().add(5, 6, 7, 8, 5)),
with(new OsmElement.Way(16), way -> way.nodes().add(9, 10, 11, 12, 13)),
with(new ReaderRelation(17), rel -> {
with(new OsmElement.Relation(17), rel -> {
rel.setTag("type", relationType);
rel.setTag("attr", "value");
rel.setTag("should_emit", "yes");
rel.add(new ReaderRelation.Member(ReaderRelation.Member.WAY, 14, "outer"));
rel.add(new ReaderRelation.Member(ReaderRelation.Member.WAY, 15, null)); // missing
rel.add(new ReaderRelation.Member(ReaderRelation.Member.WAY, 16, "inner")); // incorrect
rel.members().add(new OsmElement.Relation.Member(OsmElement.Type.WAY, 14, "outer"));
rel.members().add(new OsmElement.Relation.Member(OsmElement.Type.WAY, 15, null)); // missing
rel.members().add(new OsmElement.Relation.Member(OsmElement.Type.WAY, 16, "inner")); // incorrect
}),
with(new ReaderRelation(18), rel -> {
with(new OsmElement.Relation(18), rel -> {
rel.setTag("type", "relation");
rel.setTag("name", "rel name");
rel.add(new ReaderRelation.Member(ReaderRelation.Member.WAY, 17, "outer"));
rel.members().add(new OsmElement.Relation.Member(OsmElement.Type.WAY, 17, "outer"));
})
),
in -> in.hasTag("type", "relation") ?
@ -984,21 +982,21 @@ public class PlanetilerTests {
var results = runWithOsmElements(
Map.of("threads", "1"),
List.of(
new ReaderNode(1, 0, 0),
new ReaderNode(2, GeoUtils.getWorldLat(0.375), 0),
new ReaderNode(3, GeoUtils.getWorldLat(0.25), 0),
new ReaderNode(4, GeoUtils.getWorldLat(0.125), 0),
with(new ReaderWay(5), way -> {
new OsmElement.Node(1, 0, 0),
new OsmElement.Node(2, GeoUtils.getWorldLat(0.375), 0),
new OsmElement.Node(3, GeoUtils.getWorldLat(0.25), 0),
new OsmElement.Node(4, GeoUtils.getWorldLat(0.125), 0),
with(new OsmElement.Way(5), way -> {
way.setTag("attr", "value1");
way.getNodes().add(1, 2);
way.nodes().add(1, 2);
}),
with(new ReaderWay(6), way -> {
with(new OsmElement.Way(6), way -> {
way.setTag("attr", "value2");
way.getNodes().add(3, 4);
way.nodes().add(3, 4);
}),
with(new ReaderRelation(6), rel -> {
with(new OsmElement.Relation(6), rel -> {
rel.setTag("name", "relation name");
rel.add(new ReaderRelation.Member(ReaderRelation.WAY, 6, "role"));
rel.members().add(new OsmElement.Relation.Member(OsmElement.Type.WAY, 6, "role"));
})
),
(relation) -> {
@ -1063,13 +1061,13 @@ public class PlanetilerTests {
var results = run(
Map.of("threads", "1"),
(featureGroup, p, config) -> processOsmFeatures(featureGroup, p, config, List.of(
with(new ReaderNode(1, 0, 0), node -> node.setTag("a", "b")),
new ReaderNode(2, GeoUtils.getWorldLat(0.375), 0),
with(new ReaderWay(3), way -> {
way.getNodes().add(1, 2);
with(new OsmElement.Node(1, 0, 0), node -> node.setTag("a", "b")),
new OsmElement.Node(2, GeoUtils.getWorldLat(0.375), 0),
with(new OsmElement.Way(3), way -> {
way.nodes().add(1, 2);
}),
with(new ReaderWay(4), way -> {
way.getNodes().add(1, 2);
with(new OsmElement.Way(4), way -> {
way.nodes().add(1, 2);
})
)),
profile
@ -1352,8 +1350,8 @@ public class PlanetilerTests {
var results = runWithOsmElements(
Map.of("threads", "1"),
List.of(
with(new ReaderNode(1, lat, lng1), t -> t.setTag("a", 1)),
with(new ReaderNode(2, lat, lng2), t -> t.setTag("a", 3))
with(new OsmElement.Node(1, lat, lng1), t -> t.setTag("a", 1)),
with(new OsmElement.Node(2, lat, lng2), t -> t.setTag("a", 3))
),
new Profile.NullProfile() {
private final List<SourceFeature> featureList = new CopyOnWriteArrayList<>();
@ -1428,8 +1426,8 @@ public class PlanetilerTests {
var results = runWithOsmElements(
Map.of("threads", "1"),
List.of(
with(new ReaderNode(1, lat, lng1), t -> t.setTag("a", 1)),
with(new ReaderNode(2, lat, lng2), t -> t.setTag("a", 3))
with(new OsmElement.Node(1, lat, lng1), t -> t.setTag("a", 1)),
with(new OsmElement.Node(2, lat, lng2), t -> t.setTag("a", 3))
),
profile
);
@ -1530,31 +1528,31 @@ public class PlanetilerTests {
public void testBadRelation() throws Exception {
// this threw an exception in OsmMultipolygon.build
OsmXml osmInfo = TestUtils.readOsmXml("bad_spain_relation.xml");
List<ReaderElement> elements = new ArrayList<>();
List<OsmElement> elements = new ArrayList<>();
for (var node : orEmpty(osmInfo.nodes())) {
elements.add(new ReaderNode(node.id(), node.lat(), node.lon()));
elements.add(new OsmElement.Node(node.id(), node.lat(), node.lon()));
}
for (var way : orEmpty(osmInfo.ways())) {
ReaderWay readerWay = new ReaderWay(way.id());
var readerWay = new OsmElement.Way(way.id());
elements.add(readerWay);
for (var tag : orEmpty(way.tags())) {
readerWay.setTag(tag.k(), tag.v());
}
for (var nodeRef : orEmpty(way.nodeRefs())) {
readerWay.getNodes().add(nodeRef.ref());
readerWay.nodes().add(nodeRef.ref());
}
}
for (var relation : orEmpty(osmInfo.relation())) {
ReaderRelation readerRelation = new ReaderRelation(relation.id());
var readerRelation = new OsmElement.Relation(relation.id());
elements.add(readerRelation);
for (var tag : orEmpty(relation.tags())) {
readerRelation.setTag(tag.k(), tag.v());
}
for (var member : orEmpty(relation.members())) {
readerRelation.add(new ReaderRelation.Member(switch (member.type()) {
case "way" -> ReaderRelation.Member.WAY;
case "relation" -> ReaderRelation.Member.RELATION;
case "node" -> ReaderRelation.Member.NODE;
readerRelation.members().add(new OsmElement.Relation.Member(switch (member.type()) {
case "way" -> OsmElement.Type.WAY;
case "relation" -> OsmElement.Type.RELATION;
case "node" -> OsmElement.Type.NODE;
default -> throw new IllegalStateException("Unexpected value: " + member.type());
}, member.ref(), member.role()));
}
@ -1613,10 +1611,17 @@ public class PlanetilerTests {
var results = runWithOsmElements(
Map.of("threads", "1"),
List.of(
with(new ReaderNode(1, 0, 0), t -> t.setTag("attr", "value"))
with(new OsmElement.Node(1, 0, 0), t -> t.setTag("attr", "value"))
),
(in, features) -> {
throw new IllegalStateException("intentional exception!");
throw new IllegalStateException("intentional exception!") {
// suppress stack trace in logs
@Override
public synchronized Throwable fillInStackTrace() {
return this;
}
};
}
);

Wyświetl plik

@ -1,9 +1,6 @@
package com.onthegomap.planetiler.collection;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.*;
import java.util.ArrayList;
import java.util.HashSet;
@ -11,6 +8,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.stream.Stream;
import org.junit.jupiter.api.Test;
public class IterableOnceTest {
@ -91,6 +89,22 @@ public class IterableOnceTest {
iters++;
}
assertEquals(List.of(1, 2, 3, 4), result.stream().sorted().toList());
assertEquals(3, iters);
assertEquals(2, iters);
}
@Test
public void testWaitsToCallNext() {
var iter = Stream.of(1, 2).peek(i -> {
if (i == 2) {
throw new Error();
}
}).iterator();
IterableOnce<Integer> items = iter::next;
var iter2 = items.iterator();
assertTrue(iter2.hasNext());
assertEquals(1, iter2.next());
assertThrows(Error.class, () -> {
iter2.hasNext();
});
}
}

Wyświetl plik

@ -1,60 +0,0 @@
package com.onthegomap.planetiler.reader.osm;
import static org.junit.jupiter.api.Assertions.assertEquals;
import com.carrotsearch.hppc.LongArrayList;
import com.graphhopper.reader.ReaderNode;
import com.graphhopper.reader.ReaderRelation;
import com.graphhopper.reader.ReaderWay;
import com.graphhopper.reader.osm.OSMFileHeader;
import java.util.List;
import java.util.Map;
import org.junit.jupiter.api.Test;
public class OsmElementTest {
@Test
public void testFileheader() {
OSMFileHeader header = new OSMFileHeader();
assertEquals(new OsmElement.Other(0, Map.of()), OsmElement.fromGraphhopper(header));
}
@Test
public void testNode() {
ReaderNode input = new ReaderNode(1, 2, 3);
input.setTag("a", "b");
OsmElement.Node node = (OsmElement.Node) OsmElement.fromGraphhopper(input);
assertEquals(1, node.id());
assertEquals(2, node.lat());
assertEquals(3, node.lon());
assertEquals(Map.of("a", "b"), node.tags());
}
@Test
public void testWay() {
ReaderWay input = new ReaderWay(1);
input.setTag("a", "b");
input.getNodes().add(1, 2, 3);
OsmElement.Way way = (OsmElement.Way) OsmElement.fromGraphhopper(input);
assertEquals(1, way.id());
assertEquals(LongArrayList.from(1, 2, 3), way.nodes());
assertEquals(Map.of("a", "b"), way.tags());
}
@Test
public void testRelation() {
ReaderRelation input = new ReaderRelation(1);
input.setTag("a", "b");
input.add(new ReaderRelation.Member(ReaderRelation.Member.NODE, 1, "noderole"));
input.add(new ReaderRelation.Member(ReaderRelation.Member.WAY, 2, "wayrole"));
input.add(new ReaderRelation.Member(ReaderRelation.Member.RELATION, 3, "relationrole"));
OsmElement.Relation relation = (OsmElement.Relation) OsmElement.fromGraphhopper(input);
assertEquals(1, relation.id());
assertEquals(Map.of("a", "b"), relation.tags());
assertEquals(List.of(
new OsmElement.Relation.Member(OsmElement.Type.NODE, 1, "noderole"),
new OsmElement.Relation.Member(OsmElement.Type.WAY, 2, "wayrole"),
new OsmElement.Relation.Member(OsmElement.Type.RELATION, 3, "relationrole")
), relation.members());
}
}

Wyświetl plik

@ -2,44 +2,123 @@ package com.onthegomap.planetiler.reader.osm;
import static org.junit.jupiter.api.Assertions.assertEquals;
import com.graphhopper.reader.ReaderElement;
import com.carrotsearch.hppc.LongArrayList;
import com.onthegomap.planetiler.TestUtils;
import com.onthegomap.planetiler.stats.Stats;
import com.onthegomap.planetiler.worker.WorkerPipeline;
import java.nio.file.Path;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.locationtech.jts.geom.Envelope;
public class OsmInputFileTest {
private final OsmInputFile file = new OsmInputFile(TestUtils.pathToResource("monaco-latest.osm.pbf"));
private final Path path = TestUtils.pathToResource("monaco-latest.osm.pbf");
private final OsmElement.Node expectedNode = new OsmElement.Node(1737114566L, Map.of(
"highway", "crossing",
"crossing", "zebra"
), 43.7409723, 7.4303278);
private final OsmElement.Way expectedWay = new OsmElement.Way(4097656L, Map.of(
"name", "Avenue Princesse Alice",
"lanes", "2",
"maxspeed", "30",
"highway", "primary",
"surface", "asphalt",
"lit", "yes"
), LongArrayList.from(
21912089L, 7265761724L, 1079750744L, 2104793864L, 6340961560L, 1110560507L, 21912093L, 6340961559L, 21912095L,
7265762803L, 2104793866L, 6340961561L, 5603088200L, 6340961562L, 21912097L, 21912099L
));
private final OsmElement.Relation expectedRel = new OsmElement.Relation(7360630L, Map.of(
"local_ref", "Saint-Roman",
"public_transport:version", "2",
"name", "Saint-Roman",
"public_transport", "stop_area",
"type", "public_transport",
"operator", "Compagnie des Autobus de Monaco",
"network", "Autobus de Monaco"
), List.of(
new OsmElement.Relation.Member(OsmElement.Type.WAY, 503638817L, "platform"),
new OsmElement.Relation.Member(OsmElement.Type.WAY, 503638816L, "platform"),
new OsmElement.Relation.Member(OsmElement.Type.NODE, 4939122054L, "platform"),
new OsmElement.Relation.Member(OsmElement.Type.NODE, 3465728159L, "stop"),
new OsmElement.Relation.Member(OsmElement.Type.NODE, 4939122068L, "platform"),
new OsmElement.Relation.Member(OsmElement.Type.NODE, 3805333988L, "stop")
));
private final Envelope expectedBounds = new Envelope(7.409205, 7.448637, 43.72335, 43.75169);
@Test
public void testGetBounds() {
assertEquals(new Envelope(7.409205, 7.448637, 43.72335, 43.75169), file.getLatLonBounds());
assertEquals(expectedBounds, new OsmInputFile(path).getLatLonBounds());
}
@Test
public void testGetHeader() {
assertEquals(new OsmHeader(
expectedBounds,
List.of("OsmSchema-V0.6", "DenseNodes"),
List.of(),
"osmium/1.8.0",
"",
Instant.parse("2021-04-21T20:21:46Z"),
2947,
"http://download.geofabrik.de/europe/monaco-updates"
),
new OsmInputFile(path).getHeader()
);
}
@ParameterizedTest
@ValueSource(booleans = {false, true})
@Timeout(30)
public void testReadMonacoTwice() {
public void testReadMonacoTwice(boolean lazy) {
for (int i = 1; i <= 2; i++) {
AtomicInteger nodes = new AtomicInteger(0);
AtomicInteger ways = new AtomicInteger(0);
AtomicInteger rels = new AtomicInteger(0);
WorkerPipeline.start("test", Stats.inMemory())
.fromGenerator("pbf", file.read("test", 2))
.addBuffer("reader_queue", 1_000, 100)
.sinkToConsumer("counter", 1, elem -> {
switch (elem.getType()) {
case ReaderElement.NODE -> nodes.incrementAndGet();
case ReaderElement.WAY -> ways.incrementAndGet();
case ReaderElement.RELATION -> rels.incrementAndGet();
}
}).await();
assertEquals(25_423, nodes.get(), "nodes pass " + i);
assertEquals(4_106, ways.get(), "ways pass " + i);
assertEquals(243, rels.get(), "rels pass " + i);
AtomicReference<OsmElement.Node> node = new AtomicReference<>();
AtomicReference<OsmElement.Way> way = new AtomicReference<>();
AtomicReference<OsmElement.Relation> rel = new AtomicReference<>();
var file = new OsmInputFile(path, lazy);
try (var osmReader = file.get()) {
WorkerPipeline.start("test", Stats.inMemory())
.fromGenerator("pbf", osmReader::forEachBlock)
.addBuffer("pbf_blocks", 100)
.sinkToConsumer("counter", 1, block -> {
for (var elem : block.decodeElements()) {
if (elem instanceof OsmElement.Node n) {
if (n.id() == expectedNode.id()) {
node.set(n);
}
nodes.incrementAndGet();
} else if (elem instanceof OsmElement.Way w) {
if (w.id() == expectedWay.id()) {
way.set(w);
}
ways.incrementAndGet();
} else if (elem instanceof OsmElement.Relation r) {
if (r.id() == expectedRel.id()) {
rel.set(r);
}
rels.incrementAndGet();
}
}
}).await();
assertEquals(25_423, nodes.get(), "nodes pass " + i);
assertEquals(4_106, ways.get(), "ways pass " + i);
assertEquals(243, rels.get(), "rels pass " + i);
assertEquals(expectedNode, node.get());
assertEquals(expectedWay, way.get());
assertEquals(expectedRel, rel.get());
}
}
}
}

Wyświetl plik

@ -6,10 +6,6 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import com.graphhopper.reader.ReaderElement;
import com.graphhopper.reader.ReaderNode;
import com.graphhopper.reader.ReaderRelation;
import com.graphhopper.reader.ReaderWay;
import com.onthegomap.planetiler.Profile;
import com.onthegomap.planetiler.TestUtils;
import com.onthegomap.planetiler.collection.LongLongMap;
@ -27,18 +23,19 @@ import org.junit.jupiter.params.provider.ValueSource;
public class OsmReaderTest {
public final OsmSource osmSource = (name, threads) -> next -> {
public final OsmBlockSource osmSource = next -> {
};
private final Stats stats = Stats.inMemory();
private final Profile profile = new Profile.NullProfile();
private final LongLongMap nodeMap = LongLongMap.newInMemorySortedTable();
@Test
public void testPoint() throws GeometryException {
OsmReader reader = newOsmReader();
var node = new ReaderNode(1, 0, 0);
var node = new OsmElement.Node(1, 0, 0);
node.setTag("key", "value");
reader.processPass1Element(node);
reader.processPass1Block(List.of(node));
SourceFeature feature = reader.processNodePass2(node);
assertTrue(feature.isPoint());
assertFalse(feature.canBePolygon());
@ -61,15 +58,13 @@ public class OsmReaderTest {
public void testLine() throws GeometryException {
OsmReader reader = newOsmReader();
var nodeCache = reader.newNodeLocationProvider();
var node1 = new ReaderNode(1, 0, 0);
var node1 = new OsmElement.Node(1, 0, 0);
var node2 = node(2, 0.75, 0.75);
var way = new ReaderWay(3);
way.getNodes().add(node1.getId(), node2.getId());
var way = new OsmElement.Way(3);
way.nodes().add(node1.id(), node2.id());
way.setTag("key", "value");
reader.processPass1Element(node1);
reader.processPass1Element(node2);
reader.processPass1Element(way);
reader.processPass1Block(List.of(node1, node2, way));
SourceFeature feature = reader.processWayPass2(way, nodeCache);
assertTrue(feature.canBeLine());
@ -105,15 +100,11 @@ public class OsmReaderTest {
var node2 = node(2, 0.5, 0.75);
var node3 = node(3, 0.75, 0.75);
var node4 = node(4, 0.75, 0.5);
var way = new ReaderWay(3);
way.getNodes().add(1, 2, 3, 4, 1);
var way = new OsmElement.Way(3);
way.nodes().add(1, 2, 3, 4, 1);
way.setTag("key", "value");
reader.processPass1Element(node1);
reader.processPass1Element(node2);
reader.processPass1Element(node3);
reader.processPass1Element(node4);
reader.processPass1Element(way);
reader.processPass1Block(List.of(node1, node2, node3, node4, way));
SourceFeature feature = reader.processWayPass2(way, nodeCache);
assertTrue(feature.canBeLine());
@ -148,15 +139,11 @@ public class OsmReaderTest {
var node2 = node(2, 0.5, 0.75);
var node3 = node(3, 0.75, 0.75);
var node4 = node(4, 0.75, 0.5);
var way = new ReaderWay(3);
way.getNodes().add(1, 2, 3, 4, 1);
var way = new OsmElement.Way(3);
way.nodes().add(1, 2, 3, 4, 1);
way.setTag("area", "yes");
reader.processPass1Element(node1);
reader.processPass1Element(node2);
reader.processPass1Element(node3);
reader.processPass1Element(node4);
reader.processPass1Element(way);
reader.processPass1Block(List.of(node1, node2, node3, node4, way));
SourceFeature feature = reader.processWayPass2(way, nodeCache);
assertFalse(feature.canBeLine());
@ -188,15 +175,11 @@ public class OsmReaderTest {
var node2 = node(2, 0.5, 0.75);
var node3 = node(3, 0.75, 0.75);
var node4 = node(4, 0.75, 0.5);
var way = new ReaderWay(5);
way.getNodes().add(1, 2, 3, 4, 1);
var way = new OsmElement.Way(5);
way.nodes().add(1, 2, 3, 4, 1);
way.setTag("area", "no");
reader.processPass1Element(node1);
reader.processPass1Element(node2);
reader.processPass1Element(node3);
reader.processPass1Element(node4);
reader.processPass1Element(way);
reader.processPass1Block(List.of(node1, node2, node3, node4, way));
SourceFeature feature = reader.processWayPass2(way, nodeCache);
assertTrue(feature.canBeLine());
@ -224,11 +207,10 @@ public class OsmReaderTest {
public void testLineWithTooFewPoints() throws GeometryException {
OsmReader reader = newOsmReader();
var node1 = node(1, 0.5, 0.5);
var way = new ReaderWay(3);
way.getNodes().add(1);
var way = new OsmElement.Way(3);
way.nodes().add(1);
reader.processPass1Element(node1);
reader.processPass1Element(way);
reader.processPass1Block(List.of(node1, way));
SourceFeature feature = reader.processWayPass2(way, reader.newNodeLocationProvider());
assertFalse(feature.canBeLine());
@ -250,12 +232,10 @@ public class OsmReaderTest {
OsmReader reader = newOsmReader();
var node1 = node(1, 0.5, 0.5);
var node2 = node(2, 0.5, 0.75);
var way = new ReaderWay(3);
way.getNodes().add(1, 2, 1);
var way = new OsmElement.Way(3);
way.nodes().add(1, 2, 1);
reader.processPass1Element(node1);
reader.processPass1Element(node2);
reader.processPass1Element(way);
reader.processPass1Block(List.of(node1, node2, way));
SourceFeature feature = reader.processWayPass2(way, reader.newNodeLocationProvider());
assertTrue(feature.canBeLine());
@ -286,14 +266,16 @@ public class OsmReaderTest {
public void testInvalidPolygon() throws GeometryException {
OsmReader reader = newOsmReader();
reader.processPass1Element(node(1, 0.5, 0.5));
reader.processPass1Element(node(2, 0.75, 0.5));
reader.processPass1Element(node(3, 0.5, 0.75));
reader.processPass1Element(node(4, 0.75, 0.75));
var way = new ReaderWay(6);
reader.processPass1Block(List.of(
node(1, 0.5, 0.5),
node(2, 0.75, 0.5),
node(3, 0.5, 0.75),
node(4, 0.75, 0.75)
));
var way = new OsmElement.Way(6);
way.setTag("area", "yes");
way.getNodes().add(1, 2, 3, 4, 1);
reader.processPass1Element(way);
way.nodes().add(1, 2, 3, 4, 1);
reader.processPass1Block(List.of(way));
SourceFeature feature = reader.processWayPass2(way, reader.newNodeLocationProvider());
assertFalse(feature.canBeLine());
@ -322,16 +304,16 @@ public class OsmReaderTest {
assertEquals(1.207, feature.length(), 1e-2);
}
private ReaderNode node(long id, double x, double y) {
return new ReaderNode(id, GeoUtils.getWorldLat(y), GeoUtils.getWorldLon(x));
private OsmElement.Node node(long id, double x, double y) {
return new OsmElement.Node(id, GeoUtils.getWorldLat(y), GeoUtils.getWorldLon(x));
}
@Test
public void testLineReferencingNonexistentNode() {
OsmReader reader = newOsmReader();
var way = new ReaderWay(321);
way.getNodes().add(123, 2222, 333, 444, 123);
reader.processPass1Element(way);
var way = new OsmElement.Way(321);
way.nodes().add(123, 2222, 333, 444, 123);
reader.processPass1Block(List.of(way));
SourceFeature feature = reader.processWayPass2(way, reader.newNodeLocationProvider());
assertTrue(feature.canBeLine());
@ -350,27 +332,27 @@ public class OsmReaderTest {
assertThrows(GeometryException.class, feature::length);
}
private final Function<ReaderElement, Stream<ReaderNode>> nodes = elem ->
elem instanceof ReaderNode node ? Stream.of(node) : Stream.empty();
private final Function<OsmElement, Stream<OsmElement.Node>> nodes = elem ->
elem instanceof OsmElement.Node node ? Stream.of(node) : Stream.empty();
private final Function<ReaderElement, Stream<ReaderWay>> ways = elem ->
elem instanceof ReaderWay way ? Stream.of(way) : Stream.empty();
private final Function<OsmElement, Stream<OsmElement.Way>> ways = elem ->
elem instanceof OsmElement.Way way ? Stream.of(way) : Stream.empty();
@ParameterizedTest
@ValueSource(strings = {"multipolygon", "boundary", "land_area"})
public void testMultiPolygon(String relationType) throws GeometryException {
OsmReader reader = newOsmReader();
var outerway = new ReaderWay(9);
outerway.getNodes().add(1, 2, 3, 4, 1);
var innerway = new ReaderWay(10);
innerway.getNodes().add(5, 6, 7, 8, 5);
var outerway = new OsmElement.Way(9);
outerway.nodes().add(1, 2, 3, 4, 1);
var innerway = new OsmElement.Way(10);
innerway.nodes().add(5, 6, 7, 8, 5);
var relation = new ReaderRelation(11);
var relation = new OsmElement.Relation(11);
relation.setTag("type", relationType);
relation.add(new ReaderRelation.Member(ReaderRelation.WAY, outerway.getId(), "outer"));
relation.add(new ReaderRelation.Member(ReaderRelation.WAY, innerway.getId(), "inner"));
relation.members().add(new OsmElement.Relation.Member(OsmElement.Type.WAY, outerway.id(), "outer"));
relation.members().add(new OsmElement.Relation.Member(OsmElement.Type.WAY, innerway.id(), "inner"));
List<ReaderElement> elements = List.of(
List<OsmElement> elements = List.of(
node(1, 0.1, 0.1),
node(2, 0.9, 0.1),
node(3, 0.9, 0.9),
@ -387,7 +369,7 @@ public class OsmReaderTest {
relation
);
elements.forEach(reader::processPass1Element);
reader.processPass1Block(elements);
elements.stream().flatMap(nodes).forEach(reader::processNodePass2);
var nodeCache = reader.newNodeLocationProvider();
elements.stream().flatMap(ways).forEach(way -> reader.processWayPass2(way, nodeCache));
@ -422,21 +404,21 @@ public class OsmReaderTest {
@Test
public void testMultipolygonInfersCorrectParent() throws GeometryException {
OsmReader reader = newOsmReader();
var outerway = new ReaderWay(13);
outerway.getNodes().add(1, 2, 3, 4, 1);
var innerway = new ReaderWay(14);
innerway.getNodes().add(5, 6, 7, 8, 5);
var innerinnerway = new ReaderWay(15);
innerinnerway.getNodes().add(9, 10, 11, 12, 9);
var outerway = new OsmElement.Way(13);
outerway.nodes().add(1, 2, 3, 4, 1);
var innerway = new OsmElement.Way(14);
innerway.nodes().add(5, 6, 7, 8, 5);
var innerinnerway = new OsmElement.Way(15);
innerinnerway.nodes().add(9, 10, 11, 12, 9);
var relation = new ReaderRelation(16);
var relation = new OsmElement.Relation(16);
relation.setTag("type", "multipolygon");
relation.add(new ReaderRelation.Member(ReaderRelation.WAY, outerway.getId(), "outer"));
relation.add(new ReaderRelation.Member(ReaderRelation.WAY, innerway.getId(), "inner"));
relation.members().add(new OsmElement.Relation.Member(OsmElement.Type.WAY, outerway.id(), "outer"));
relation.members().add(new OsmElement.Relation.Member(OsmElement.Type.WAY, innerway.id(), "inner"));
// nested hole marked as inner, but should actually be outer
relation.add(new ReaderRelation.Member(ReaderRelation.WAY, innerinnerway.getId(), "inner"));
relation.members().add(new OsmElement.Relation.Member(OsmElement.Type.WAY, innerinnerway.id(), "inner"));
List<ReaderElement> elements = List.of(
List<OsmElement> elements = List.of(
node(1, 0.1, 0.1),
node(2, 0.9, 0.1),
node(3, 0.9, 0.9),
@ -459,7 +441,7 @@ public class OsmReaderTest {
relation
);
elements.forEach(reader::processPass1Element);
reader.processPass1Block(elements);
elements.stream().flatMap(nodes).forEach(reader::processNodePass2);
var nodeCache = reader.newNodeLocationProvider();
elements.stream().flatMap(ways).forEach(way -> reader.processWayPass2(way, nodeCache));
@ -488,21 +470,21 @@ public class OsmReaderTest {
@Test
public void testInvalidMultipolygon() throws GeometryException {
OsmReader reader = newOsmReader();
var outerway = new ReaderWay(13);
outerway.getNodes().add(1, 2, 3, 4, 1);
var innerway = new ReaderWay(14);
innerway.getNodes().add(5, 6, 7, 8, 5);
var innerinnerway = new ReaderWay(15);
innerinnerway.getNodes().add(9, 10, 11, 12, 9);
var outerway = new OsmElement.Way(13);
outerway.nodes().add(1, 2, 3, 4, 1);
var innerway = new OsmElement.Way(14);
innerway.nodes().add(5, 6, 7, 8, 5);
var innerinnerway = new OsmElement.Way(15);
innerinnerway.nodes().add(9, 10, 11, 12, 9);
var relation = new ReaderRelation(16);
var relation = new OsmElement.Relation(16);
relation.setTag("type", "multipolygon");
relation.add(new ReaderRelation.Member(ReaderRelation.WAY, outerway.getId(), "outer"));
relation.add(new ReaderRelation.Member(ReaderRelation.WAY, innerway.getId(), "inner"));
relation.members().add(new OsmElement.Relation.Member(OsmElement.Type.WAY, outerway.id(), "outer"));
relation.members().add(new OsmElement.Relation.Member(OsmElement.Type.WAY, innerway.id(), "inner"));
// nested hole marked as inner, but should actually be outer
relation.add(new ReaderRelation.Member(ReaderRelation.WAY, innerinnerway.getId(), "inner"));
relation.members().add(new OsmElement.Relation.Member(OsmElement.Type.WAY, innerinnerway.id(), "inner"));
List<ReaderElement> elements = List.of(
List<OsmElement> elements = List.of(
node(1, 0.1, 0.1),
node(2, 0.9, 0.1),
node(3, 0.9, 0.9),
@ -525,7 +507,7 @@ public class OsmReaderTest {
relation
);
elements.forEach(reader::processPass1Element);
reader.processPass1Block(elements);
elements.stream().flatMap(nodes).forEach(reader::processNodePass2);
var nodeCache = reader.newNodeLocationProvider();
elements.stream().flatMap(ways).forEach(way -> reader.processWayPass2(way, nodeCache));
@ -558,14 +540,14 @@ public class OsmReaderTest {
@Test
public void testMultiPolygonRefersToNonexistentNode() {
OsmReader reader = newOsmReader();
var outerway = new ReaderWay(5);
outerway.getNodes().add(1, 2, 3, 4, 1);
var outerway = new OsmElement.Way(5);
outerway.nodes().add(1, 2, 3, 4, 1);
var relation = new ReaderRelation(6);
var relation = new OsmElement.Relation(6);
relation.setTag("type", "multipolygon");
relation.add(new ReaderRelation.Member(ReaderRelation.WAY, outerway.getId(), "outer"));
relation.members().add(new OsmElement.Relation.Member(OsmElement.Type.WAY, outerway.id(), "outer"));
List<ReaderElement> elements = List.of(
List<OsmElement> elements = List.of(
node(1, 0.1, 0.1),
// node(2, 0.9, 0.1), MISSING!
node(3, 0.9, 0.9),
@ -576,7 +558,7 @@ public class OsmReaderTest {
relation
);
elements.forEach(reader::processPass1Element);
reader.processPass1Block(elements);
elements.stream().flatMap(nodes).forEach(reader::processNodePass2);
var nodeCache = reader.newNodeLocationProvider();
elements.stream().flatMap(ways).forEach(way -> reader.processWayPass2(way, nodeCache));
@ -592,11 +574,11 @@ public class OsmReaderTest {
public void testMultiPolygonRefersToNonexistentWay() {
OsmReader reader = newOsmReader();
var relation = new ReaderRelation(6);
var relation = new OsmElement.Relation(6);
relation.setTag("type", "multipolygon");
relation.add(new ReaderRelation.Member(ReaderRelation.WAY, 5, "outer"));
relation.members().add(new OsmElement.Relation.Member(OsmElement.Type.WAY, 5, "outer"));
List<ReaderElement> elements = List.of(
List<OsmElement> elements = List.of(
node(1, 0.1, 0.1),
node(2, 0.9, 0.1),
node(3, 0.9, 0.9),
@ -607,7 +589,7 @@ public class OsmReaderTest {
relation
);
elements.forEach(reader::processPass1Element);
reader.processPass1Block(elements);
elements.stream().flatMap(nodes).forEach(reader::processNodePass2);
var nodeCache = reader.newNodeLocationProvider();
elements.stream().flatMap(ways).forEach(way -> reader.processWayPass2(way, nodeCache));
@ -623,25 +605,22 @@ public class OsmReaderTest {
public void testWayInRelation() {
record OtherRelInfo(long id) implements OsmRelationInfo {}
record TestRelInfo(long id, String name) implements OsmRelationInfo {}
OsmReader reader = new OsmReader("osm", osmSource, nodeMap, new Profile.NullProfile() {
OsmReader reader = new OsmReader("osm", () -> osmSource, nodeMap, new Profile.NullProfile() {
@Override
public List<OsmRelationInfo> preprocessOsmRelation(OsmElement.Relation relation) {
return List.of(new TestRelInfo(1, "name"));
}
}, stats);
var nodeCache = reader.newNodeLocationProvider();
var node1 = new ReaderNode(1, 0, 0);
var node1 = new OsmElement.Node(1, 0, 0);
var node2 = node(2, 0.75, 0.75);
var way = new ReaderWay(3);
way.getNodes().add(node1.getId(), node2.getId());
var way = new OsmElement.Way(3);
way.nodes().add(node1.id(), node2.id());
way.setTag("key", "value");
var relation = new ReaderRelation(4);
relation.add(new ReaderRelation.Member(ReaderRelation.Member.WAY, 3, "rolename"));
var relation = new OsmElement.Relation(4);
relation.members().add(new OsmElement.Relation.Member(OsmElement.Type.WAY, 3, "rolename"));
reader.processPass1Element(node1);
reader.processPass1Element(node2);
reader.processPass1Element(way);
reader.processPass1Element(relation);
reader.processPass1Block(List.of(node1, node2, way, relation));
SourceFeature feature = reader.processWayPass2(way, nodeCache);
@ -653,26 +632,23 @@ public class OsmReaderTest {
@Test
public void testNodeOrWayRelationInRelationDoesntTriggerWay() {
record TestRelInfo(long id, String name) implements OsmRelationInfo {}
OsmReader reader = new OsmReader("osm", osmSource, nodeMap, new Profile.NullProfile() {
OsmReader reader = new OsmReader("osm", () -> osmSource, nodeMap, new Profile.NullProfile() {
@Override
public List<OsmRelationInfo> preprocessOsmRelation(OsmElement.Relation relation) {
return List.of(new TestRelInfo(1, "name"));
}
}, stats);
var nodeCache = reader.newNodeLocationProvider();
var node1 = new ReaderNode(1, 0, 0);
var node1 = new OsmElement.Node(1, 0, 0);
var node2 = node(2, 0.75, 0.75);
var way = new ReaderWay(3);
way.getNodes().add(node1.getId(), node2.getId());
var way = new OsmElement.Way(3);
way.nodes().add(node1.id(), node2.id());
way.setTag("key", "value");
var relation = new ReaderRelation(4);
relation.add(new ReaderRelation.Member(ReaderRelation.Member.RELATION, 3, "rolename"));
relation.add(new ReaderRelation.Member(ReaderRelation.Member.NODE, 3, "rolename"));
var relation = new OsmElement.Relation(4);
relation.members().add(new OsmElement.Relation.Member(OsmElement.Type.RELATION, 3, "rolename"));
relation.members().add(new OsmElement.Relation.Member(OsmElement.Type.NODE, 3, "rolename"));
reader.processPass1Element(node1);
reader.processPass1Element(node2);
reader.processPass1Element(way);
reader.processPass1Element(relation);
reader.processPass1Block(List.of(node1, node2, way, relation));
SourceFeature feature = reader.processWayPass2(way, nodeCache);
@ -680,6 +656,6 @@ public class OsmReaderTest {
}
private OsmReader newOsmReader() {
return new OsmReader("osm", osmSource, nodeMap, profile, stats);
return new OsmReader("osm", () -> osmSource, nodeMap, profile, stats);
}
}

Wyświetl plik

@ -0,0 +1,108 @@
package com.onthegomap.planetiler.worker;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
public class DistributorTest {
@Test
@Timeout(10)
public void testEmpty() {
List<Integer> processed = new CopyOnWriteArrayList<>();
Distributor<Integer> distributor = Distributor.createWithCapacity(1);
var thisDistributor = distributor.forThread(processed::add);
thisDistributor.close();
thisDistributor.close();
assertThrows(IllegalStateException.class, () -> thisDistributor.accept(3));
assertEquals(List.of(), processed);
}
@Test
@Timeout(10)
public void testDistributor1Thread() {
List<Integer> processed = new CopyOnWriteArrayList<>();
Distributor<Integer> distributor = Distributor.createWithCapacity(1);
var thisDistributor = distributor.forThread(processed::add);
assertEquals(List.of(), processed);
thisDistributor.accept(1);
assertEquals(List.of(1), processed);
thisDistributor.accept(2);
assertEquals(List.of(1, 2), processed);
thisDistributor.close();
assertThrows(IllegalStateException.class, () -> thisDistributor.accept(3));
assertEquals(List.of(1, 2), processed);
}
@Test
@Timeout(10)
public void testDistributor2Threads() throws InterruptedException {
List<Integer> processed = new CopyOnWriteArrayList<>();
Distributor<Integer> distributor = Distributor.createWithCapacity(1);
CountDownLatch a = new CountDownLatch(1);
CountDownLatch b = new CountDownLatch(1);
CountDownLatch c = new CountDownLatch(1);
CountDownLatch d = new CountDownLatch(1);
CountDownLatch e = new CountDownLatch(1);
CountDownLatch f = new CountDownLatch(1);
Thread thread1 = new Thread(() -> {
try {
var thisDistributor = distributor.forThread(processed::add);
thisDistributor.accept(1);
thisDistributor.accept(2);
a.countDown();
c.await();
d.await();
thisDistributor.accept(5);
thisDistributor.accept(6); // queue full
e.countDown();
f.await();
thisDistributor.close();
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
});
thread1.start();
Thread thread2 = new Thread(() -> {
try {
var thisDistributor = distributor.forThread(processed::add);
a.await();
thisDistributor.accept(3);
thisDistributor.accept(4);
b.countDown();
thisDistributor.finish();
d.countDown();
f.await();
thisDistributor.drain();
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
});
thread2.start();
b.await();
assertEquals(List.of(1, 2, 3, 4), processed);
c.countDown();
e.await();
assertEquals(List.of(1, 2, 3, 4, 6), processed);
f.countDown();
thread1.join();
thread2.join();
assertEquals(List.of(1, 2, 3, 4, 6, 5), processed);
}
}

Wyświetl plik

@ -0,0 +1,67 @@
package com.onthegomap.planetiler.worker;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
public class WeightedHandoffQueueTest {
@Test
@Timeout(10)
public void testEmpty() {
WeightedHandoffQueue<String> q = new WeightedHandoffQueue<>(1, 1);
q.close();
assertNull(q.get());
}
@Test
@Timeout(10)
public void testOneItem() {
WeightedHandoffQueue<String> q = new WeightedHandoffQueue<>(1, 1);
q.accept("a", 1);
assertEquals("a", q.get());
q.close();
assertNull(q.get());
}
@Test
@Timeout(10)
public void testOneItemCloseFirst() {
WeightedHandoffQueue<String> q = new WeightedHandoffQueue<>(2, 1);
q.accept("a", 1);
q.close();
assertEquals("a", q.get());
assertNull(q.get());
}
@Test
@Timeout(10)
public void testMoreItemsThanBatchSize() {
WeightedHandoffQueue<String> q = new WeightedHandoffQueue<>(3, 2);
q.accept("a", 1);
q.accept("b", 1);
q.accept("c", 1);
q.close();
assertEquals("a", q.get());
assertEquals("b", q.get());
assertEquals("c", q.get());
assertNull(q.get());
}
@Test
@Timeout(10)
public void testManyItems() {
WeightedHandoffQueue<Integer> q = new WeightedHandoffQueue<>(100, 100);
for (int i = 0; i < 950; i++) {
q.accept(i, 1);
}
q.close();
for (int i = 0; i < 950; i++) {
assertEquals((Integer) i, q.get());
}
assertNull(q.get());
}
}