working merge sort, started wrapping with feature map

pull/1/head
Mike Barry 2021-04-28 05:45:33 -04:00
rodzic ed97a45958
commit 14e28619e2
21 zmienionych plików z 1057 dodań i 92 usunięć

Wyświetl plik

@ -13,6 +13,7 @@ jobs:
build:
runs-on: ubuntu-latest
timeout-minutes: 10
steps:
- uses: actions/checkout@v2

Wyświetl plik

@ -41,7 +41,6 @@
<option name="INDENT_CHAINED_CALLS" value="false" />
</JSCodeStyleSettings>
<JavaCodeStyleSettings>
<option name="INSERT_INNER_CLASS_IMPORTS" value="true" />
<option name="CLASS_COUNT_TO_USE_IMPORT_ON_DEMAND" value="999" />
<option name="NAMES_COUNT_TO_USE_IMPORT_ON_DEMAND" value="999" />
<option name="PACKAGES_TO_USE_IMPORT_ON_DEMAND">

Wyświetl plik

@ -130,6 +130,12 @@
<version>3.9.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
<scope>test</scope>
</dependency>
</dependencies>

Wyświetl plik

@ -1,5 +1,6 @@
package com.onthegomap.flatmap;
import com.onthegomap.flatmap.collections.MergeSort;
import java.util.function.Consumer;
import org.locationtech.jts.geom.Geometry;
import org.locationtech.jts.geom.GeometryCollection;
@ -21,11 +22,11 @@ public class FeatureRenderer {
this.config = config;
}
public void renderFeature(RenderableFeature feature, Consumer<RenderedFeature> consumer) {
public void renderFeature(RenderableFeature feature, Consumer<MergeSort.Entry> consumer) {
renderGeometry(feature.getGeometry(), feature, consumer);
}
public void renderGeometry(Geometry geom, RenderableFeature feature, Consumer<RenderedFeature> consumer) {
public void renderGeometry(Geometry geom, RenderableFeature feature, Consumer<MergeSort.Entry> consumer) {
// TODO what about converting between area and line?
if (geom instanceof Point point) {
addPointFeature(feature, point, consumer);
@ -43,15 +44,15 @@ public class FeatureRenderer {
}
}
private void addPointFeature(RenderableFeature feature, Point point, Consumer<RenderedFeature> consumer) {
private void addPointFeature(RenderableFeature feature, Point point, Consumer<MergeSort.Entry> consumer) {
// TODO render features into tile
}
private void addPointFeature(RenderableFeature feature, MultiPoint points, Consumer<RenderedFeature> consumer) {
private void addPointFeature(RenderableFeature feature, MultiPoint points, Consumer<MergeSort.Entry> consumer) {
// TODO render features into tile
}
private void addLinearFeature(RenderableFeature feature, Geometry geom, Consumer<RenderedFeature> consumer) {
private void addLinearFeature(RenderableFeature feature, Geometry geom, Consumer<MergeSort.Entry> consumer) {
// TODO render lines / areas into tile
}
}

Wyświetl plik

@ -1,6 +1,8 @@
package com.onthegomap.flatmap;
import com.onthegomap.flatmap.VectorTileEncoder.VectorTileFeature;
import com.onthegomap.flatmap.collections.MergeSortFeatureMap.FeatureMapKey;
import com.onthegomap.flatmap.collections.MergeSortFeatureMap.FeatureMapValue;
import java.util.Map;
public record LayerFeature(
@ -13,4 +15,16 @@ public record LayerFeature(
long id
) implements VectorTileFeature {
public static LayerFeature of(FeatureMapKey key, FeatureMapValue value) {
return new LayerFeature(
key.hasGroup(),
value.group(),
key.zOrder(),
value.attrs(),
value.geomType(),
value.commands(),
value.featureId()
);
}
}

Wyświetl plik

@ -1,6 +1,7 @@
package com.onthegomap.flatmap;
import com.onthegomap.flatmap.collections.LongLongMap;
import com.onthegomap.flatmap.collections.MergeSort;
import com.onthegomap.flatmap.collections.MergeSortFeatureMap;
import com.onthegomap.flatmap.profiles.OpenMapTilesProfile;
import com.onthegomap.flatmap.reader.NaturalEarthReader;
@ -64,9 +65,9 @@ public class OpenMapTilesMain {
FileUtils.forceMkdir(tmpDir.toFile());
File nodeDb = tmpDir.resolve("node.db").toFile();
Path featureDb = tmpDir.resolve("feature.db");
LongLongMap nodeLocations = new LongLongMap.MapdbSortedTable(nodeDb);
MergeSortFeatureMap featureMap = new MergeSortFeatureMap(featureDb, stats);
MergeSort featureDb = new MergeSort(tmpDir.resolve("feature.db"), threads, stats);
MergeSortFeatureMap featureMap = new MergeSortFeatureMap(featureDb, profile);
FlatMapConfig config = new FlatMapConfig(profile, envelope, threads, stats, logInterval);
FeatureRenderer renderer = new FeatureRenderer(config);
@ -97,7 +98,7 @@ public class OpenMapTilesMain {
profile.release();
nodeDb.delete();
stats.time("sort", featureMap::sort);
stats.time("sort", featureDb::sort);
stats.time("mbtiles", () -> MbtilesWriter.writeOutput(featureCount.get(), featureMap, output, config));
stats.stopTimer("import");

Wyświetl plik

@ -1,6 +1,7 @@
package com.onthegomap.flatmap;
import com.graphhopper.reader.ReaderRelation;
import com.onthegomap.flatmap.VectorTileEncoder.VectorTileFeature;
import com.onthegomap.flatmap.reader.OpenStreetMapReader.RelationInfo;
import java.util.List;
@ -12,4 +13,29 @@ public interface Profile {
void release();
List<VectorTileFeature> postProcessLayerFeatures(String layer, int zoom, List<VectorTileFeature> items);
class NullProfile implements Profile {
@Override
public List<RelationInfo> preprocessOsmRelation(ReaderRelation relation) {
return null;
}
@Override
public void processFeature(SourceFeature sourceFeature, RenderableFeatures features) {
}
@Override
public void release() {
}
@Override
public List<VectorTileFeature> postProcessLayerFeatures(String layer, int zoom,
List<VectorTileFeature> items) {
return items;
}
}
}

Wyświetl plik

@ -1,5 +0,0 @@
package com.onthegomap.flatmap;
public record RenderedFeature(long sort, byte[] value) {
}

Wyświetl plik

@ -292,7 +292,7 @@ public class VectorTileEncoder {
Map<String, Object> attrs();
}
public VectorTileEncoder addLayerFeatures(String layerName, List<VectorTileFeature> features) {
public VectorTileEncoder addLayerFeatures(String layerName, List<? extends VectorTileFeature> features) {
if (features.isEmpty()) {
return this;
}

Wyświetl plik

@ -0,0 +1,31 @@
package com.onthegomap.flatmap.collections;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
public class CommonStringEncoder {
private final ConcurrentMap<String, Byte> stringToId = new ConcurrentHashMap<>(255);
private final String[] idToLayer = new String[255];
private final AtomicInteger layerId = new AtomicInteger(0);
public String decode(byte id) {
String str = idToLayer[id];
if (str == null) {
throw new IllegalStateException("No string for " + id);
}
return str;
}
public byte encode(String string) {
return stringToId.computeIfAbsent(string, s -> {
int id = layerId.getAndIncrement();
if (id > 250) {
throw new IllegalStateException("Too many string keys when inserting " + string);
}
idToLayer[id] = string;
return (byte) id;
});
}
}

Wyświetl plik

@ -0,0 +1,384 @@
package com.onthegomap.flatmap.collections;
import com.google.common.annotations.VisibleForTesting;
import com.onthegomap.flatmap.monitoring.ProcessInfo;
import com.onthegomap.flatmap.monitoring.ProgressLoggers;
import com.onthegomap.flatmap.monitoring.Stats;
import com.onthegomap.flatmap.worker.Topology;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.PriorityQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import org.apache.commons.io.FileUtils;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MergeSort implements Iterable<MergeSort.Entry> {
private static final Logger LOGGER = LoggerFactory.getLogger(MergeSort.class);
private static final long MAX_CHUNK_SIZE = 1_000_000_000; // 1GB
private final Path dir;
private final Stats stats;
private final int chunkSizeLimit;
private final int workers;
private final List<Chunk> chunks = new ArrayList<>();
private Chunk current;
private volatile boolean sorted = false;
public MergeSort(Path tempDir, int threads, Stats stats) {
this(
tempDir,
threads,
(int) Math.min(
MAX_CHUNK_SIZE,
(ProcessInfo.getMaxMemoryBytes() / 2) / threads
),
stats
);
}
public MergeSort(Path dir, int workers, int chunkSizeLimit, Stats stats) {
this.dir = dir;
this.stats = stats;
this.chunkSizeLimit = chunkSizeLimit;
long memory = ProcessInfo.getMaxMemoryBytes();
if (chunkSizeLimit > memory / 2) {
throw new IllegalStateException(
"Not enough memory to use chunk size " + chunkSizeLimit + " only have " + memory);
}
this.workers = workers;
LOGGER.info("Using merge sort feature map, chunk size=" + (chunkSizeLimit / 1_000_000) + "mb workers=" + workers);
try {
FileUtils.deleteDirectory(dir.toFile());
Files.createDirectories(dir);
newChunk();
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
public void add(Entry item) {
try {
assert !sorted;
current.add(item);
if (current.bytesInMemory > chunkSizeLimit) {
newChunk();
}
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
public long getStorageSize() {
return FileUtils.sizeOfDirectory(dir.toFile());
}
private static <T> T time(AtomicLong timer, Supplier<T> func) {
long start = System.nanoTime();
try {
return func.get();
} finally {
timer.addAndGet(System.nanoTime() - start);
}
}
public void sort() {
assert !sorted;
if (current != null) {
try {
current.close();
} catch (IOException e) {
// ok
}
}
long start = System.nanoTime();
AtomicLong reading = new AtomicLong(0);
AtomicLong writing = new AtomicLong(0);
AtomicLong sorting = new AtomicLong(0);
AtomicLong doneCounter = new AtomicLong(0);
CompletableFuture<ProgressLoggers> logger = new CompletableFuture<>();
var topology = Topology.start("sort", stats)
.readFromTiny("item_queue", chunks)
.sinkToConsumer("worker", workers, chunk -> {
var toSort = time(reading, chunk::readAll);
time(sorting, toSort::sort);
time(writing, toSort::flush);
doneCounter.incrementAndGet();
try {
logger.get().log();
} catch (InterruptedException | ExecutionException e) {
throw new IllegalStateException(e);
}
});
ProgressLoggers loggers = new ProgressLoggers("sort")
.addPercentCounter("chunks", chunks.size(), doneCounter)
.addFileSize(this::getStorageSize)
.addProcessStats()
.addTopologyStats(topology);
logger.complete(loggers);
topology.await();
sorted = true;
LOGGER.info("Sorted all chunks " + Duration.ofNanos(System.nanoTime() - start).toSeconds() +
"s read:" + Duration.ofNanos(reading.get()).toSeconds() +
"s write:" + Duration.ofNanos(writing.get()).toSeconds() +
"s sort:" + Duration.ofNanos(sorting.get()).toSeconds() + "s");
}
@NotNull
@Override
public Iterator<Entry> iterator() {
assert sorted;
PriorityQueue<PeekableScanner> queue = new PriorityQueue<>(chunks.size());
for (Chunk chunk : chunks) {
if (chunk.itemCount > 0) {
queue.add(chunk.newReader());
}
}
return new Iterator<>() {
@Override
public boolean hasNext() {
return !queue.isEmpty();
}
@Override
public Entry next() {
PeekableScanner scanner = queue.poll();
assert scanner != null;
Entry next = scanner.next();
if (scanner.hasNext()) {
queue.add(scanner);
}
return next;
}
};
}
private void newChunk() throws IOException {
Path chunkPath = dir.resolve("chunk" + (chunks.size() + 1));
chunkPath.toFile().deleteOnExit();
if (current != null) {
current.close();
}
chunks.add(current = new Chunk(chunkPath));
}
@VisibleForTesting
List<Entry> toList() {
List<Entry> result = new ArrayList<>();
for (Entry item : this) {
result.add(item);
}
return result;
}
class Chunk implements Closeable {
private final Path path;
private final DataOutputStream outputStream;
private int bytesInMemory = 0;
private int itemCount = 0;
private Chunk(Path path) throws IOException {
this.path = path;
this.outputStream = new DataOutputStream(new BufferedOutputStream(Files.newOutputStream(path), 50_000));
}
public PeekableScanner newReader() {
return new PeekableScanner(path, itemCount);
}
public void add(Entry entry) throws IOException {
write(outputStream, entry);
bytesInMemory +=
// pointer to feature
8 +
// Feature class overhead
16 +
// long sort member of feature
8 +
// byte array pointer
8 +
// byte array size
24 + entry.value.length;
itemCount++;
}
public class SortableChunk {
private Entry[] featuresToSort;
private SortableChunk(Entry[] featuresToSort) {
this.featuresToSort = featuresToSort;
}
public SortableChunk sort() {
Arrays.sort(featuresToSort);
return this;
}
public SortableChunk flush() {
try (DataOutputStream out = new DataOutputStream(
new BufferedOutputStream(Files.newOutputStream(path), 50_000))) {
for (Entry feature : featuresToSort) {
write(out, feature);
}
featuresToSort = null;
return this;
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
}
public SortableChunk readAll() {
try (PeekableScanner scanner = newReader()) {
Entry[] featuresToSort = new Entry[itemCount];
int i = 0;
while (scanner.hasNext()) {
featuresToSort[i] = scanner.next();
i++;
}
if (i != itemCount) {
throw new IllegalStateException("Expected " + itemCount + " features in " + path + " got " + i);
}
return new SortableChunk(featuresToSort);
}
}
public static void write(DataOutputStream out, Entry entry) throws IOException {
out.writeLong(entry.sortKey);
out.writeInt(entry.value.length);
out.write(entry.value);
}
@Override
public void close() throws IOException {
outputStream.close();
}
}
class PeekableScanner implements Closeable, Comparable<PeekableScanner>, Iterator<Entry> {
private final int count;
private int read = 0;
private final DataInputStream input;
private Entry next;
PeekableScanner(Path path, int count) {
this.count = count;
try {
input = new DataInputStream(new BufferedInputStream(Files.newInputStream(path), 50_000));
next = readNextFeature();
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
@Override
public boolean hasNext() {
return next != null;
}
@Override
public Entry next() {
Entry current = next;
if ((next = readNextFeature()) == null) {
close();
}
return current;
}
private Entry readNextFeature() {
if (read < count) {
try {
long nextSort = input.readLong();
int length = input.readInt();
byte[] bytes = input.readNBytes(length);
read++;
return new Entry(nextSort, bytes);
} catch (IOException e) {
throw new IllegalStateException(e);
}
} else {
return null;
}
}
@Override
public void close() {
try {
input.close();
} catch (IOException e) {
LOGGER.warn("Error closing chunk", e);
}
}
@Override
public int compareTo(@NotNull PeekableScanner o) {
return next.compareTo(o.next);
}
}
public static record Entry(long sortKey, byte[] value) implements Comparable<Entry> {
@Override
public int compareTo(@NotNull Entry o) {
return Long.compare(sortKey, o.sortKey);
}
@Override
public String toString() {
return "MergeSort.Entry{" +
"sortKey=" + sortKey +
", value=" + Arrays.toString(value) +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Entry entry = (Entry) o;
if (sortKey != entry.sortKey) {
return false;
}
return Arrays.equals(value, entry.value);
}
@Override
public int hashCode() {
int result = (int) (sortKey ^ (sortKey >>> 32));
result = 31 * result + Arrays.hashCode(value);
return result;
}
}
}

Wyświetl plik

@ -1,64 +1,100 @@
package com.onthegomap.flatmap.collections;
import com.carrotsearch.hppc.LongArrayList;
import com.onthegomap.flatmap.RenderedFeature;
import com.carrotsearch.hppc.LongLongHashMap;
import com.graphhopper.coll.GHLongLongHashMap;
import com.onthegomap.flatmap.LayerFeature;
import com.onthegomap.flatmap.Profile;
import com.onthegomap.flatmap.VectorTileEncoder;
import com.onthegomap.flatmap.VectorTileEncoder.VectorTileFeature;
import com.onthegomap.flatmap.collections.MergeSortFeatureMap.TileFeatures;
import com.onthegomap.flatmap.geo.TileCoord;
import com.onthegomap.flatmap.monitoring.Stats;
import java.nio.file.Path;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import org.jetbrains.annotations.NotNull;
import org.locationtech.jts.geom.Geometry;
import org.msgpack.core.MessageBufferPacker;
import org.msgpack.core.MessagePack;
import org.msgpack.core.MessageUnpacker;
import org.msgpack.value.Value;
import org.msgpack.value.ValueFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MergeSortFeatureMap implements Consumer<RenderedFeature>, Iterable<TileFeatures> {
public record MergeSortFeatureMap(MergeSort mergeSort, Profile profile, CommonStringEncoder commonStrings)
implements Consumer<MergeSort.Entry>, Iterable<TileFeatures> {
private volatile boolean prepared = false;
private static final Logger LOGGER = LoggerFactory.getLogger(MergeSortFeatureMap.class);
public MergeSortFeatureMap(Path featureDb, Stats stats) {
}
public void sort() {
prepared = true;
public MergeSortFeatureMap(MergeSort mergeSort, Profile profile) {
this(mergeSort, profile, new CommonStringEncoder());
}
@Override
public void accept(RenderedFeature renderedFeature) {
if (prepared) {
throw new IllegalStateException("Attempting to add feature but already prepared");
}
}
public long getStorageSize() {
return 0;
public void accept(MergeSort.Entry entry) {
mergeSort.add(entry);
}
@Override
public Iterator<TileFeatures> iterator() {
if (!prepared) {
throw new IllegalStateException("Attempting to iterate over features but not prepared yet");
Iterator<MergeSort.Entry> entries = mergeSort.iterator();
if (!entries.hasNext()) {
return Collections.emptyIterator();
}
MergeSort.Entry firstFeature = entries.next();
byte[] firstData = firstFeature.value();
long firstSort = firstFeature.sortKey();
return new Iterator<>() {
private byte[] last = firstData;
private long lastSortKey = firstSort;
private int lastTileId = FeatureMapKey.extractTileFromKey(firstSort);
@Override
public boolean hasNext() {
return false;
return last != null;
}
@Override
public TileFeatures next() {
return null;
TileFeatures result = new TileFeatures(lastTileId);
result.add(lastSortKey, last);
int lastTile = lastTileId;
while (entries.hasNext()) {
MergeSort.Entry next = entries.next();
last = next.value();
lastSortKey = next.sortKey();
lastTileId = FeatureMapKey.extractTileFromKey(lastSortKey);
if (lastTile != lastTileId) {
return result;
}
result.add(next.sortKey(), last);
}
return result;
}
};
}
public static class TileFeatures {
public long getStorageSize() {
return mergeSort.getStorageSize();
}
public class TileFeatures implements Consumer<MergeSort.Entry> {
private final TileCoord tile;
private final LongArrayList sortKeys = new LongArrayList();
private final List<byte[]> entries = new ArrayList<>();
private LongLongHashMap counts = null;
private byte layer = Byte.MAX_VALUE;
public TileFeatures(int tile) {
this.tile = TileCoord.decode(tile);
}
@ -72,11 +108,76 @@ public class MergeSortFeatureMap implements Consumer<RenderedFeature>, Iterable<
}
public boolean hasSameContents(TileFeatures other) {
return false;
if (other == null || other.entries.size() != entries.size()) {
return false;
}
for (int i = 0; i < entries.size(); i++) {
byte[] a = entries.get(i);
byte[] b = other.entries.get(i);
if (!Arrays.equals(a, b)) {
return false;
}
}
return true;
}
public VectorTileEncoder getTile() {
return null;
VectorTileEncoder encoder = new VectorTileEncoder();
List<VectorTileFeature> items = new ArrayList<>(entries.size());
String currentLayer = null;
for (int index = entries.size() - 1; index >= 0; index--) {
byte[] entry = entries.get(index);
long sortKey = sortKeys.get(index);
FeatureMapKey key = FeatureMapKey.decode(sortKey);
FeatureMapValue value = FeatureMapValue.decode(entry, key.hasGroup(), commonStrings);
String layer = commonStrings.decode(key.layer);
if (currentLayer == null) {
currentLayer = layer;
} else if (!currentLayer.equals(layer)) {
encoder.addLayerFeatures(
currentLayer,
profile.postProcessLayerFeatures(currentLayer, tile.z(), items)
);
currentLayer = layer;
items.clear();
}
items.add(LayerFeature.of(key, value));
}
encoder.addLayerFeatures(
currentLayer,
profile.postProcessLayerFeatures(currentLayer, tile.z(), items)
);
return encoder;
}
public TileFeatures add(long sortKey, byte[] entry) {
if (FeatureMapKey.extractHasGroupFromKey(sortKey)) {
byte thisLayer = FeatureMapKey.extractLayerIdFromKey(sortKey);
if (counts == null) {
counts = new GHLongLongHashMap();
layer = thisLayer;
} else if (thisLayer != layer) {
layer = thisLayer;
counts.clear();
}
var groupInfo = FeatureMapValue.decodeGroupInfo(entry);
long old = counts.getOrDefault(groupInfo.group, 0);
if (old >= groupInfo.limit && groupInfo.limit > 0) {
return this;
}
counts.put(groupInfo.group, old + 1);
}
sortKeys.add(sortKey);
entries.add(entry);
return this;
}
@Override
public void accept(MergeSort.Entry renderedFeature) {
add(renderedFeature.sortKey(), renderedFeature.value());
}
@Override
@ -88,4 +189,199 @@ public class MergeSortFeatureMap implements Consumer<RenderedFeature>, Iterable<
'}';
}
}
private static final ThreadLocal<MessageBufferPacker> messagePackers = ThreadLocal
.withInitial(MessagePack::newDefaultBufferPacker);
public record FeatureMapKey(long encoded, TileCoord tile, byte layer, int zOrder, boolean hasGroup) implements
Comparable<FeatureMapKey> {
private static final int Z_ORDER_MASK = (1 << 23) - 1;
public static final int Z_ORDER_MAX = (1 << 22) - 1;
public static final int Z_ORDER_MIN = -(1 << 22);
public static final int Z_ORDER_BITS = 23;
public static FeatureMapKey of(int tile, byte layer, int zOrder, boolean hasGroup) {
return new FeatureMapKey(encode(tile, layer, zOrder, hasGroup), TileCoord.decode(tile), layer, zOrder, hasGroup);
}
public static FeatureMapKey decode(long encoded) {
return of(
extractTileFromKey(encoded),
extractLayerIdFromKey(encoded),
extractZorderFromKey(encoded),
extractHasGroupFromKey(encoded)
);
}
public static long encode(int tile, byte layer, int zOrder, boolean hasGroup) {
return ((long) tile << 32L) | ((long) (layer & 0xff) << 24L) | (((zOrder - Z_ORDER_MIN) & Z_ORDER_MASK) << 1L) | (
hasGroup ? 1 : 0);
}
public static boolean extractHasGroupFromKey(long sortKey) {
return (sortKey & 1) == 1;
}
public static int extractTileFromKey(long sortKey) {
return (int) (sortKey >> 32L);
}
public static byte extractLayerIdFromKey(long sortKey) {
return (byte) (sortKey >> 24);
}
public static int extractZorderFromKey(long sortKey) {
return ((int) ((sortKey >> 1) & Z_ORDER_MASK) + Z_ORDER_MIN);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
FeatureMapKey that = (FeatureMapKey) o;
return encoded == that.encoded;
}
@Override
public int hashCode() {
return (int) (encoded ^ (encoded >>> 32));
}
@Override
public int compareTo(@NotNull FeatureMapKey o) {
return Long.compare(encoded, o.encoded);
}
}
public static record FeatureMapValue(
long featureId,
Map<String, Object> attrs,
int[] commands,
byte geomType,
boolean hasGrouping,
int groupLimit,
long group
) {
public static record GroupInfo(long group, int limit) {
}
public static GroupInfo decodeGroupInfo(byte[] encoded) {
try (MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(encoded)) {
long group = unpacker.unpackLong();
int limit = unpacker.unpackInt();
return new GroupInfo(group, limit);
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
public static FeatureMapValue from(
long featureId,
Map<String, Object> attrs,
Geometry geom,
boolean hasGrouping,
int groupLimit
) {
long group = geom.getUserData() instanceof Long longValue ? longValue : 0;
byte geomType = (byte) VectorTileEncoder.toGeomType(geom).getNumber();
int[] commands = VectorTileEncoder.getCommands(geom);
return new FeatureMapValue(
featureId,
attrs,
commands,
geomType,
hasGrouping,
groupLimit,
group
);
}
public static FeatureMapValue decode(byte[] encoded, boolean hasGroup, CommonStringEncoder commonStrings) {
try (MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(encoded)) {
long group = 0;
int groupLimit = -1;
if (hasGroup) {
group = unpacker.unpackLong();
groupLimit = unpacker.unpackInt();
}
long id = unpacker.unpackLong();
byte geomType = unpacker.unpackByte();
int mapSize = unpacker.unpackMapHeader();
Map<String, Object> attrs = new HashMap<>(mapSize);
for (int i = 0; i < mapSize; i++) {
String key = commonStrings.decode(unpacker.unpackByte());
Value v = unpacker.unpackValue();
if (v.isStringValue()) {
attrs.put(key, v.asStringValue().asString());
} else if (v.isIntegerValue()) {
attrs.put(key, v.asIntegerValue().toLong());
} else if (v.isFloatValue()) {
attrs.put(key, v.asFloatValue().toDouble());
} else if (v.isBooleanValue()) {
attrs.put(key, v.asBooleanValue().getBoolean());
}
}
int commandSize = unpacker.unpackArrayHeader();
int[] commands = new int[commandSize];
for (int i = 0; i < commandSize; i++) {
commands[i] = unpacker.unpackInt();
}
return new FeatureMapValue(id, attrs, commands, geomType, hasGroup, groupLimit, group);
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
public byte[] encode(CommonStringEncoder commonStrings) {
MessageBufferPacker packer = messagePackers.get();
packer.clear();
try {
if (hasGrouping) {
packer.packLong(group);
packer.packInt(groupLimit);
}
packer.packLong(featureId);
packer.packByte(geomType);
packer.packMapHeader((int) attrs.values().stream().filter(Objects::nonNull).count());
for (Map.Entry<String, Object> entry : attrs.entrySet()) {
if (entry.getValue() != null) {
packer.packByte(commonStrings.encode(entry.getKey()));
Object value = entry.getValue();
if (value instanceof String) {
packer.packValue(ValueFactory.newString((String) value));
} else if (value instanceof Integer) {
packer.packValue(ValueFactory.newInteger(((Integer) value).longValue()));
} else if (value instanceof Long) {
packer.packValue(ValueFactory.newInteger((Long) value));
} else if (value instanceof Float) {
packer.packValue(ValueFactory.newFloat((Float) value));
} else if (value instanceof Double) {
packer.packValue(ValueFactory.newFloat((Double) value));
} else if (value instanceof Boolean) {
packer.packValue(ValueFactory.newBoolean((Boolean) value));
} else {
packer.packValue(ValueFactory.newString(value.toString()));
}
}
}
packer.packArrayHeader(commands.length);
for (int command : commands) {
packer.packInt(command);
}
packer.close();
} catch (IOException e) {
throw new IllegalStateException(e);
}
return packer.toByteArray();
}
}
}

Wyświetl plik

@ -1,18 +1,9 @@
package com.onthegomap.flatmap.geo;
public class TileCoord {
public record TileCoord(int encoded, int x, int y, int z) {
private final int encoded;
private final int x;
private final int y;
private final int z;
private TileCoord(int encoded, int x, int y, int z) {
public TileCoord {
assert z <= 14;
this.encoded = encoded;
this.x = x;
this.y = y;
this.z = z;
}
public static TileCoord ofXYZ(int x, int y, int z) {
@ -23,22 +14,6 @@ public class TileCoord {
return new TileCoord(encoded, decodeX(encoded), decodeY(encoded), decodeZ(encoded));
}
public int encoded() {
return encoded;
}
public int x() {
return x;
}
public int y() {
return y;
}
public int z() {
return z;
}
@Override
public boolean equals(Object o) {
if (this == o) {

Wyświetl plik

@ -50,6 +50,10 @@ public class ProcessInfo {
return null;
}
public static long getMaxMemoryBytes() {
return Runtime.getRuntime().maxMemory();
}
public static record ThreadState(String name, long cpuTimeNanos, long id) {

Wyświetl plik

@ -5,6 +5,7 @@ import com.graphhopper.reader.ReaderRelation;
import com.onthegomap.flatmap.Profile;
import com.onthegomap.flatmap.RenderableFeatures;
import com.onthegomap.flatmap.SourceFeature;
import com.onthegomap.flatmap.VectorTileEncoder;
import com.onthegomap.flatmap.reader.OpenStreetMapReader.RelationInfo;
import java.util.List;
import org.slf4j.Logger;
@ -18,6 +19,12 @@ public class OpenMapTilesProfile implements Profile {
public void release() {
}
@Override
public List<VectorTileEncoder.VectorTileFeature> postProcessLayerFeatures(String layer, int zoom,
List<VectorTileEncoder.VectorTileFeature> items) {
return items;
}
@Override
public List<RelationInfo> preprocessOsmRelation(ReaderRelation relation) {
return null;

Wyświetl plik

@ -14,10 +14,10 @@ import com.onthegomap.flatmap.OsmInputFile;
import com.onthegomap.flatmap.Profile;
import com.onthegomap.flatmap.RenderableFeature;
import com.onthegomap.flatmap.RenderableFeatures;
import com.onthegomap.flatmap.RenderedFeature;
import com.onthegomap.flatmap.SourceFeature;
import com.onthegomap.flatmap.collections.LongLongMap;
import com.onthegomap.flatmap.collections.LongLongMultimap;
import com.onthegomap.flatmap.collections.MergeSort;
import com.onthegomap.flatmap.collections.MergeSortFeatureMap;
import com.onthegomap.flatmap.geo.GeoUtils;
import com.onthegomap.flatmap.monitoring.ProgressLoggers;
@ -119,7 +119,7 @@ public class OpenStreetMapReader implements Closeable {
var topology = Topology.start("osm_pass2", stats)
.fromGenerator("pbf", osmInputFile.read(readerThreads))
.addBuffer("reader_queue", 50_000, 1_000)
.<RenderedFeature>addWorker("process", processThreads, (prev, next) -> {
.<MergeSort.Entry>addWorker("process", processThreads, (prev, next) -> {
RenderableFeatures features = new RenderableFeatures();
ReaderElement readerElement;
while ((readerElement = prev.get()) != null) {

Wyświetl plik

@ -5,8 +5,8 @@ import com.onthegomap.flatmap.FlatMapConfig;
import com.onthegomap.flatmap.Profile;
import com.onthegomap.flatmap.RenderableFeature;
import com.onthegomap.flatmap.RenderableFeatures;
import com.onthegomap.flatmap.RenderedFeature;
import com.onthegomap.flatmap.SourceFeature;
import com.onthegomap.flatmap.collections.MergeSort;
import com.onthegomap.flatmap.collections.MergeSortFeatureMap;
import com.onthegomap.flatmap.monitoring.ProgressLoggers;
import com.onthegomap.flatmap.monitoring.Stats;
@ -38,7 +38,7 @@ public abstract class Reader implements Closeable {
var topology = Topology.start(name, stats)
.fromGenerator("read", read())
.addBuffer("read_queue", 1000)
.<RenderedFeature>addWorker("process", threads, (prev, next) -> {
.<MergeSort.Entry>addWorker("process", threads, (prev, next) -> {
RenderableFeatures features = new RenderableFeatures();
SourceFeature sourceFeature;
while ((sourceFeature = prev.get()) != null) {

Wyświetl plik

@ -3,6 +3,7 @@ package com.onthegomap.flatmap.worker;
import com.onthegomap.flatmap.monitoring.ProgressLoggers;
import com.onthegomap.flatmap.monitoring.Stats;
import java.time.Duration;
import java.util.Collection;
import java.util.Iterator;
import java.util.function.Consumer;
import java.util.function.Supplier;
@ -100,6 +101,14 @@ public record Topology<T>(
}, 1);
}
public <T> Builder<?, T> readFromTiny(String name, Collection<T> items) {
WorkQueue<T> queue = new WorkQueue<>(prefix + "_" + name, items.size(), 1, stats);
for (T item : items) {
queue.accept(item);
}
return readFromQueue(queue);
}
public <T> Builder<?, T> readFromQueue(WorkQueue<T> input) {
return new Builder<>(input, stats);
}

Wyświetl plik

@ -0,0 +1,29 @@
package com.onthegomap.flatmap.collections;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import org.junit.jupiter.api.Test;
public class CommonStringEncoderTest {
private final CommonStringEncoder commonStringEncoder = new CommonStringEncoder();
@Test
public void testRoundTrip() {
byte a = commonStringEncoder.encode("a");
byte b = commonStringEncoder.encode("b");
assertEquals("a", commonStringEncoder.decode(a));
assertEquals(a, commonStringEncoder.encode("a"));
assertEquals("b", commonStringEncoder.decode(b));
assertThrows(IllegalStateException.class, () -> commonStringEncoder.decode((byte) (b + 1)));
}
@Test
public void testLimitsTo250() {
for (int i = 0; i <= 250; i++) {
commonStringEncoder.encode(Integer.toString(i));
}
assertThrows(IllegalStateException.class, () -> commonStringEncoder.encode("too many"));
}
}

Wyświetl plik

@ -1,32 +1,134 @@
package com.onthegomap.flatmap.collections;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import com.onthegomap.flatmap.RenderedFeature;
import com.onthegomap.flatmap.monitoring.Stats.InMemory;
import java.nio.file.Path;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
public class MergeSortFeatureMapTest {
public abstract class MergeSortFeatureMapTest {
@TempDir
Path tmpDir;
tmp
@Test
public void testEmpty() {
var features = new MergeSortFeatureMap(tmpDir, new InMemory());
features.sort();
assertFalse(features.iterator().hasNext());
public void test() {
}
@Test
public void testThrowsWhenPreparedOutOfOrder() {
var features = new MergeSortFeatureMap(tmpDir, new InMemory());
features.accept(new RenderedFeature(1, new byte[]{}));
assertThrows(IllegalStateException.class, features::iterator);
features.sort();
assertThrows(IllegalStateException.class, () -> features.accept(new RenderedFeature(1, new byte[]{})));
}
// private MergeSortFeatureMap features;
//
// @Before
// public void setup() {
// this.features = getMap(new Profile.NullProfile());
// }
//
// protected abstract MergeSortFeatureMap getMap(Profile profile);
//
// @TempDir
// Path tmpDir;
//
// @Test
// public void testEmpty() {
// features.sort();
// assertFalse(features.iterator().hasNext());
// }
//
// @Test
// public void testThrowsWhenPreparedOutOfOrder() {
// features.accept(new RenderedFeature(1, new byte[]{}));
// assertThrows(IllegalStateException.class, features::iterator);
// features.sort();
// assertThrows(IllegalStateException.class, () -> features.accept(new RenderedFeature(1, new byte[]{})));
// }
//
// @Test
// public void test() {
// features.accept(FeatureMapKey.);
// features.sort();
// var actual = StreamSupport.stream(features.spliterator(), false).toList();
// assertEquals(List.of(
// new TileFeatures().add
// ), actual);
// }
//
// public static class TwoWorkers extends MergeSortFeatureMapTest {
//
// @Override
// protected MergeSortFeatureMap getMap(Profile profile) {
// return new MergeSortFeatureMap(tmpDir, profile, 2, 1_000, new InMemory());
// }
// }
//
// public static class MergeSortOnePerFileFeatureMapTest extends MergeSortFeatureMapTest {
//
// @Override
// protected MergeSortFeatureMap getMap(Profile profile) {
// return new MergeSortFeatureMap(tmpDir, profile, 2, 1, new InMemory());
// }
// }
//
//
// public static class MergeSortOnePerFileOneWorkerFeatureMapTest extends MergeSortFeatureMapTest {
//
// @Override
// protected MergeSortFeatureMap getMap(Profile profile) {
// return new MergeSortFeatureMap(tmpDir, profile, 1, 1_000_00, new InMemory());
// }
// }
//
// public static class FeatureMapKeyTest {
//
// @TestFactory
// public List<DynamicTest> testEncodeLongKey() {
// List<TileCoord> tiles = List.of(
// TileCoord.ofXYZ(0, 0, 14),
// TileCoord.ofXYZ((1 << 14) - 1, (1 << 14) - 1, 14),
// TileCoord.ofXYZ(0, 0, 0),
// TileCoord.ofXYZ(0, 0, 7),
// TileCoord.ofXYZ((1 << 7) - 1, (1 << 7) - 1, 7)
// );
// List<Byte> layers = List.of((byte) 0, (byte) 1, (byte) 255);
// List<Integer> zOrders = List.of((1 << 22) - 1, 0, -(1 << 22));
// List<Boolean> hasGroups = List.of(false, true);
// List<DynamicTest> result = new ArrayList<>();
// for (TileCoord tile : tiles) {
// for (byte layer : layers) {
// for (int zOrder : zOrders) {
// for (boolean hasGroup : hasGroups) {
// FeatureMapKey key = FeatureMapKey.of(tile.encoded(), layer, zOrder, hasGroup);
// result.add(dynamicTest(key.toString(), () -> {
// FeatureMapKey decoded = FeatureMapKey.decode(key.encoded());
// assertEquals(decoded.tile(), tile.encoded(), "tile");
// assertEquals(decoded.layer(), layer, "layer");
// assertEquals(decoded.zOrder(), zOrder, "zOrder");
// assertEquals(decoded.hasGroup(), hasGroup, "hasGroup");
// }));
// }
// }
// }
// }
// return result;
// }
//
// @ParameterizedTest
// @CsvSource({
// "0,0,-2,true, 0,0,-1,false",
// "0,0,1,false, 0,0,2,false",
// "0,0,-1,false, 0,0,1,false",
// "-1,0,-2,false, -1,0,-1,false",
// "-1,0,1,false, -1,0,2,false",
// "-1,0,-1,false, -1,0,1,false",
// "-1,0,-1,false, -1,0,-1,true",
// "1,0,1,false, 1,0,1,true"
// })
// public void testEncodeLongKeyOrdering(
// int tileA, byte layerA, int zOrderA, boolean hasGroupA,
// int tileB, byte layerB, int zOrderB, boolean hasGroupB
// ) {
// assertTrue(
// FeatureMapKey.encode(tileA, layerA, zOrderA, hasGroupA)
// <
// FeatureMapKey.encode(tileB, layerB, zOrderB, hasGroupB)
// );
// }
// }
}

Wyświetl plik

@ -0,0 +1,85 @@
package com.onthegomap.flatmap.collections;
import static org.junit.jupiter.api.Assertions.assertEquals;
import com.onthegomap.flatmap.monitoring.Stats;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
public class MergeSortTest {
@TempDir
Path tmpDir;
private static MergeSort.Entry newEntry(int i) {
return new MergeSort.Entry(i, new byte[]{(byte) i});
}
private MergeSort newSorter(int workers, int chunkSizeLimit) {
return new MergeSort(tmpDir, workers, chunkSizeLimit, new Stats.InMemory());
}
@Test
public void testEmpty() {
MergeSort sorter = newSorter(1, 100);
sorter.sort();
assertEquals(List.of(), sorter.toList());
}
@Test
public void testSingle() {
MergeSort sorter = newSorter(1, 100);
sorter.add(newEntry(1));
sorter.sort();
assertEquals(List.of(newEntry(1)), sorter.toList());
}
@Test
public void testTwoItemsOneChunk() {
MergeSort sorter = newSorter(1, 100);
sorter.add(newEntry(2));
sorter.add(newEntry(1));
sorter.sort();
assertEquals(List.of(newEntry(1), newEntry(2)), sorter.toList());
}
@Test
public void testTwoItemsTwoChunks() {
MergeSort sorter = newSorter(1, 0);
sorter.add(newEntry(2));
sorter.add(newEntry(1));
sorter.sort();
assertEquals(List.of(newEntry(1), newEntry(2)), sorter.toList());
}
@Test
public void testTwoWorkers() {
MergeSort sorter = newSorter(2, 0);
sorter.add(newEntry(4));
sorter.add(newEntry(3));
sorter.add(newEntry(2));
sorter.add(newEntry(1));
sorter.sort();
assertEquals(List.of(newEntry(1), newEntry(2), newEntry(3), newEntry(4)), sorter.toList());
}
@Test
public void testManyItems() {
List<MergeSort.Entry> sorted = new ArrayList<>();
List<MergeSort.Entry> shuffled = new ArrayList<>();
for (int i = 0; i < 10_000; i++) {
shuffled.add(newEntry(i));
sorted.add(newEntry(i));
}
Collections.shuffle(shuffled, new Random(0));
MergeSort sorter = newSorter(2, 20_000);
shuffled.forEach(sorter::add);
sorter.sort();
assertEquals(sorted, sorter.toList());
}
}