2021-08-06 09:56:24 +00:00
|
|
|
package com.onthegomap.flatmap.collection;
|
2021-04-29 10:22:41 +00:00
|
|
|
|
2021-08-06 09:56:24 +00:00
|
|
|
import com.onthegomap.flatmap.config.CommonParams;
|
|
|
|
import com.onthegomap.flatmap.stats.ProcessInfo;
|
|
|
|
import com.onthegomap.flatmap.stats.ProgressLoggers;
|
|
|
|
import com.onthegomap.flatmap.stats.Stats;
|
|
|
|
import com.onthegomap.flatmap.util.FileUtils;
|
2021-08-05 11:09:52 +00:00
|
|
|
import com.onthegomap.flatmap.worker.WorkerPipeline;
|
2021-04-29 10:22:41 +00:00
|
|
|
import java.io.BufferedInputStream;
|
|
|
|
import java.io.BufferedOutputStream;
|
|
|
|
import java.io.Closeable;
|
|
|
|
import java.io.DataInputStream;
|
|
|
|
import java.io.DataOutputStream;
|
|
|
|
import java.io.IOException;
|
2021-07-17 10:51:13 +00:00
|
|
|
import java.io.InputStream;
|
|
|
|
import java.io.OutputStream;
|
2021-04-29 10:22:41 +00:00
|
|
|
import java.nio.file.Files;
|
|
|
|
import java.nio.file.Path;
|
|
|
|
import java.time.Duration;
|
|
|
|
import java.util.ArrayList;
|
|
|
|
import java.util.Arrays;
|
|
|
|
import java.util.Iterator;
|
|
|
|
import java.util.List;
|
|
|
|
import java.util.PriorityQueue;
|
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
|
import java.util.function.Supplier;
|
2021-08-06 09:56:24 +00:00
|
|
|
import java.util.zip.Deflater;
|
2021-07-17 10:51:13 +00:00
|
|
|
import java.util.zip.GZIPInputStream;
|
2021-08-06 09:56:24 +00:00
|
|
|
import java.util.zip.GZIPOutputStream;
|
2021-04-29 10:22:41 +00:00
|
|
|
import org.jetbrains.annotations.NotNull;
|
|
|
|
import org.slf4j.Logger;
|
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
2021-04-30 10:31:56 +00:00
|
|
|
class ExternalMergeSort implements FeatureSort {
|
2021-04-29 10:22:41 +00:00
|
|
|
|
2021-04-30 10:31:56 +00:00
|
|
|
private static final Logger LOGGER = LoggerFactory.getLogger(FeatureSort.class);
|
2021-04-29 10:22:41 +00:00
|
|
|
|
|
|
|
private static final long MAX_CHUNK_SIZE = 1_000_000_000; // 1GB
|
|
|
|
|
|
|
|
private final Path dir;
|
|
|
|
private final Stats stats;
|
|
|
|
private final int chunkSizeLimit;
|
|
|
|
private final int workers;
|
2021-05-03 09:57:26 +00:00
|
|
|
private final AtomicLong features = new AtomicLong(0);
|
2021-04-29 10:22:41 +00:00
|
|
|
|
|
|
|
private final List<Chunk> chunks = new ArrayList<>();
|
2021-07-17 10:51:13 +00:00
|
|
|
private final boolean gzip;
|
2021-07-30 09:32:10 +00:00
|
|
|
private final CommonParams config;
|
2021-04-29 10:22:41 +00:00
|
|
|
private Chunk current;
|
|
|
|
private volatile boolean sorted = false;
|
|
|
|
|
2021-07-30 11:07:00 +00:00
|
|
|
ExternalMergeSort(Path tempDir, CommonParams config, Stats stats) {
|
2021-04-29 10:22:41 +00:00
|
|
|
this(
|
|
|
|
tempDir,
|
2021-07-30 11:07:00 +00:00
|
|
|
config.threads(),
|
2021-04-29 10:22:41 +00:00
|
|
|
(int) Math.min(
|
|
|
|
MAX_CHUNK_SIZE,
|
2021-07-30 11:07:00 +00:00
|
|
|
(ProcessInfo.getMaxMemoryBytes() / 2) / config.threads()
|
2021-04-29 10:22:41 +00:00
|
|
|
),
|
2021-07-30 11:07:00 +00:00
|
|
|
config.gzipTempStorage(),
|
2021-07-30 09:32:10 +00:00
|
|
|
config,
|
2021-04-29 10:22:41 +00:00
|
|
|
stats
|
|
|
|
);
|
|
|
|
}
|
|
|
|
|
2021-07-30 09:32:10 +00:00
|
|
|
ExternalMergeSort(Path dir, int workers, int chunkSizeLimit, boolean gzip, CommonParams config, Stats stats) {
|
|
|
|
this.config = config;
|
2021-04-29 10:22:41 +00:00
|
|
|
this.dir = dir;
|
|
|
|
this.stats = stats;
|
|
|
|
this.chunkSizeLimit = chunkSizeLimit;
|
2021-07-17 10:51:13 +00:00
|
|
|
this.gzip = gzip;
|
2021-04-29 10:22:41 +00:00
|
|
|
long memory = ProcessInfo.getMaxMemoryBytes();
|
|
|
|
if (chunkSizeLimit > memory / 2) {
|
|
|
|
throw new IllegalStateException(
|
|
|
|
"Not enough memory to use chunk size " + chunkSizeLimit + " only have " + memory);
|
|
|
|
}
|
|
|
|
this.workers = workers;
|
|
|
|
LOGGER.info("Using merge sort feature map, chunk size=" + (chunkSizeLimit / 1_000_000) + "mb workers=" + workers);
|
|
|
|
try {
|
2021-05-04 10:17:10 +00:00
|
|
|
FileUtils.deleteDirectory(dir);
|
2021-04-29 10:22:41 +00:00
|
|
|
Files.createDirectories(dir);
|
|
|
|
newChunk();
|
|
|
|
} catch (IOException e) {
|
|
|
|
throw new IllegalStateException(e);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-07-17 10:51:13 +00:00
|
|
|
private DataInputStream newInputStream(Path path) throws IOException {
|
|
|
|
InputStream inputStream = new BufferedInputStream(Files.newInputStream(path), 50_000);
|
|
|
|
if (gzip) {
|
|
|
|
inputStream = new GZIPInputStream(inputStream);
|
|
|
|
}
|
|
|
|
return new DataInputStream(inputStream);
|
|
|
|
}
|
|
|
|
|
|
|
|
private DataOutputStream newOutputStream(Path path) throws IOException {
|
|
|
|
OutputStream outputStream = new BufferedOutputStream(Files.newOutputStream(path), 50_000);
|
|
|
|
if (gzip) {
|
2021-07-17 19:50:26 +00:00
|
|
|
outputStream = new FastGzipOutputStream(outputStream);
|
2021-07-17 10:51:13 +00:00
|
|
|
}
|
|
|
|
return new DataOutputStream(outputStream);
|
|
|
|
}
|
|
|
|
|
2021-04-29 10:22:41 +00:00
|
|
|
@Override
|
|
|
|
public void add(Entry item) {
|
|
|
|
try {
|
|
|
|
assert !sorted;
|
2021-05-03 09:57:26 +00:00
|
|
|
features.incrementAndGet();
|
2021-04-29 10:22:41 +00:00
|
|
|
current.add(item);
|
|
|
|
if (current.bytesInMemory > chunkSizeLimit) {
|
|
|
|
newChunk();
|
|
|
|
}
|
|
|
|
} catch (IOException e) {
|
|
|
|
throw new IllegalStateException(e);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public long getStorageSize() {
|
2021-05-04 10:17:10 +00:00
|
|
|
return FileUtils.directorySize(dir);
|
2021-04-29 10:22:41 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
private static <T> T time(AtomicLong timer, Supplier<T> func) {
|
|
|
|
long start = System.nanoTime();
|
|
|
|
try {
|
|
|
|
return func.get();
|
|
|
|
} finally {
|
|
|
|
timer.addAndGet(System.nanoTime() - start);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public void sort() {
|
|
|
|
assert !sorted;
|
|
|
|
if (current != null) {
|
|
|
|
try {
|
|
|
|
current.close();
|
|
|
|
} catch (IOException e) {
|
|
|
|
// ok
|
|
|
|
}
|
|
|
|
}
|
2021-08-10 10:55:30 +00:00
|
|
|
var timer = stats.startStage("sort");
|
2021-04-29 10:22:41 +00:00
|
|
|
AtomicLong reading = new AtomicLong(0);
|
|
|
|
AtomicLong writing = new AtomicLong(0);
|
|
|
|
AtomicLong sorting = new AtomicLong(0);
|
|
|
|
AtomicLong doneCounter = new AtomicLong(0);
|
|
|
|
|
2021-08-05 11:09:52 +00:00
|
|
|
var pipeline = WorkerPipeline.start("sort", stats)
|
2021-04-29 10:22:41 +00:00
|
|
|
.readFromTiny("item_queue", chunks)
|
|
|
|
.sinkToConsumer("worker", workers, chunk -> {
|
|
|
|
var toSort = time(reading, chunk::readAll);
|
|
|
|
time(sorting, toSort::sort);
|
|
|
|
time(writing, toSort::flush);
|
|
|
|
doneCounter.incrementAndGet();
|
|
|
|
});
|
|
|
|
|
|
|
|
ProgressLoggers loggers = new ProgressLoggers("sort")
|
|
|
|
.addPercentCounter("chunks", chunks.size(), doneCounter)
|
|
|
|
.addFileSize(this::getStorageSize)
|
2021-08-10 10:55:30 +00:00
|
|
|
.newLine()
|
2021-04-29 10:22:41 +00:00
|
|
|
.addProcessStats()
|
2021-08-10 10:55:30 +00:00
|
|
|
.newLine()
|
2021-08-05 11:09:52 +00:00
|
|
|
.addPipelineStats(pipeline);
|
2021-04-29 10:22:41 +00:00
|
|
|
|
2021-08-05 11:09:52 +00:00
|
|
|
pipeline.awaitAndLog(loggers, config.logInterval());
|
2021-04-29 10:22:41 +00:00
|
|
|
|
|
|
|
sorted = true;
|
2021-06-08 00:55:23 +00:00
|
|
|
timer.stop();
|
|
|
|
LOGGER.info("read:" + Duration.ofNanos(reading.get()).toSeconds() +
|
2021-04-29 10:22:41 +00:00
|
|
|
"s write:" + Duration.ofNanos(writing.get()).toSeconds() +
|
|
|
|
"s sort:" + Duration.ofNanos(sorting.get()).toSeconds() + "s");
|
|
|
|
}
|
|
|
|
|
2021-05-03 09:57:26 +00:00
|
|
|
@Override
|
|
|
|
public long size() {
|
|
|
|
return features.get();
|
|
|
|
}
|
|
|
|
|
2021-04-29 10:22:41 +00:00
|
|
|
@NotNull
|
|
|
|
@Override
|
|
|
|
public Iterator<Entry> iterator() {
|
|
|
|
assert sorted;
|
|
|
|
PriorityQueue<PeekableScanner> queue = new PriorityQueue<>(chunks.size());
|
|
|
|
for (Chunk chunk : chunks) {
|
|
|
|
if (chunk.itemCount > 0) {
|
|
|
|
queue.add(chunk.newReader());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return new Iterator<>() {
|
|
|
|
@Override
|
|
|
|
public boolean hasNext() {
|
|
|
|
return !queue.isEmpty();
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public Entry next() {
|
|
|
|
PeekableScanner scanner = queue.poll();
|
|
|
|
assert scanner != null;
|
|
|
|
Entry next = scanner.next();
|
|
|
|
if (scanner.hasNext()) {
|
|
|
|
queue.add(scanner);
|
|
|
|
}
|
|
|
|
return next;
|
|
|
|
}
|
|
|
|
};
|
|
|
|
}
|
|
|
|
|
|
|
|
private void newChunk() throws IOException {
|
|
|
|
Path chunkPath = dir.resolve("chunk" + (chunks.size() + 1));
|
|
|
|
chunkPath.toFile().deleteOnExit();
|
|
|
|
if (current != null) {
|
|
|
|
current.close();
|
|
|
|
}
|
|
|
|
chunks.add(current = new Chunk(chunkPath));
|
|
|
|
}
|
|
|
|
|
2021-07-17 10:51:13 +00:00
|
|
|
private class Chunk implements Closeable {
|
2021-04-29 10:22:41 +00:00
|
|
|
|
|
|
|
private final Path path;
|
|
|
|
private final DataOutputStream outputStream;
|
|
|
|
private int bytesInMemory = 0;
|
|
|
|
private int itemCount = 0;
|
|
|
|
|
|
|
|
private Chunk(Path path) throws IOException {
|
|
|
|
this.path = path;
|
2021-07-17 10:51:13 +00:00
|
|
|
this.outputStream = newOutputStream(path);
|
2021-04-29 10:22:41 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
public PeekableScanner newReader() {
|
|
|
|
return new PeekableScanner(path, itemCount);
|
|
|
|
}
|
|
|
|
|
|
|
|
public void add(Entry entry) throws IOException {
|
|
|
|
write(outputStream, entry);
|
|
|
|
bytesInMemory +=
|
|
|
|
// pointer to feature
|
|
|
|
8 +
|
|
|
|
// Feature class overhead
|
|
|
|
16 +
|
|
|
|
// long sort member of feature
|
|
|
|
8 +
|
|
|
|
// byte array pointer
|
|
|
|
8 +
|
|
|
|
// byte array size
|
|
|
|
24 + entry.value().length;
|
|
|
|
itemCount++;
|
|
|
|
}
|
|
|
|
|
|
|
|
public class SortableChunk {
|
|
|
|
|
|
|
|
private Entry[] featuresToSort;
|
|
|
|
|
|
|
|
private SortableChunk(Entry[] featuresToSort) {
|
|
|
|
this.featuresToSort = featuresToSort;
|
|
|
|
}
|
|
|
|
|
|
|
|
public SortableChunk sort() {
|
|
|
|
Arrays.sort(featuresToSort);
|
|
|
|
return this;
|
|
|
|
}
|
|
|
|
|
|
|
|
public SortableChunk flush() {
|
2021-07-17 10:51:13 +00:00
|
|
|
try (DataOutputStream out = newOutputStream(path)) {
|
2021-04-29 10:22:41 +00:00
|
|
|
for (Entry feature : featuresToSort) {
|
|
|
|
write(out, feature);
|
|
|
|
}
|
|
|
|
featuresToSort = null;
|
|
|
|
return this;
|
|
|
|
} catch (IOException e) {
|
|
|
|
throw new IllegalStateException(e);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
public SortableChunk readAll() {
|
|
|
|
try (PeekableScanner scanner = newReader()) {
|
|
|
|
Entry[] featuresToSort = new Entry[itemCount];
|
|
|
|
int i = 0;
|
|
|
|
while (scanner.hasNext()) {
|
|
|
|
featuresToSort[i] = scanner.next();
|
|
|
|
i++;
|
|
|
|
}
|
|
|
|
if (i != itemCount) {
|
|
|
|
throw new IllegalStateException("Expected " + itemCount + " features in " + path + " got " + i);
|
|
|
|
}
|
|
|
|
return new SortableChunk(featuresToSort);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
public static void write(DataOutputStream out, Entry entry) throws IOException {
|
|
|
|
out.writeLong(entry.sortKey());
|
|
|
|
out.writeInt(entry.value().length);
|
|
|
|
out.write(entry.value());
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public void close() throws IOException {
|
|
|
|
outputStream.close();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-07-17 10:51:13 +00:00
|
|
|
private class PeekableScanner implements Closeable, Comparable<PeekableScanner>, Iterator<Entry> {
|
2021-04-29 10:22:41 +00:00
|
|
|
|
|
|
|
private final int count;
|
|
|
|
private int read = 0;
|
|
|
|
private final DataInputStream input;
|
|
|
|
private Entry next;
|
|
|
|
|
|
|
|
PeekableScanner(Path path, int count) {
|
|
|
|
this.count = count;
|
|
|
|
try {
|
2021-07-17 10:51:13 +00:00
|
|
|
input = newInputStream(path);
|
2021-04-29 10:22:41 +00:00
|
|
|
next = readNextFeature();
|
|
|
|
} catch (IOException e) {
|
|
|
|
throw new IllegalStateException(e);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public boolean hasNext() {
|
|
|
|
return next != null;
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public Entry next() {
|
|
|
|
Entry current = next;
|
|
|
|
if ((next = readNextFeature()) == null) {
|
|
|
|
close();
|
|
|
|
}
|
|
|
|
return current;
|
|
|
|
}
|
|
|
|
|
|
|
|
private Entry readNextFeature() {
|
|
|
|
if (read < count) {
|
|
|
|
try {
|
|
|
|
long nextSort = input.readLong();
|
|
|
|
int length = input.readInt();
|
|
|
|
byte[] bytes = input.readNBytes(length);
|
|
|
|
read++;
|
|
|
|
return new Entry(nextSort, bytes);
|
|
|
|
} catch (IOException e) {
|
|
|
|
throw new IllegalStateException(e);
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
return null;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public void close() {
|
|
|
|
try {
|
|
|
|
input.close();
|
|
|
|
} catch (IOException e) {
|
|
|
|
LOGGER.warn("Error closing chunk", e);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public int compareTo(@NotNull PeekableScanner o) {
|
|
|
|
return next.compareTo(o.next);
|
|
|
|
}
|
|
|
|
}
|
2021-08-06 09:56:24 +00:00
|
|
|
|
|
|
|
private static class FastGzipOutputStream extends GZIPOutputStream {
|
|
|
|
|
|
|
|
public FastGzipOutputStream(OutputStream out) throws IOException {
|
|
|
|
super(out);
|
|
|
|
def.setLevel(Deflater.BEST_SPEED);
|
|
|
|
}
|
|
|
|
}
|
2021-04-29 10:22:41 +00:00
|
|
|
}
|