kopia lustrzana https://github.com/onthegomap/planetiler
Fix sonar issues with unclosed resources (#195)
rodzic
4c67e10ed5
commit
2db49fc76f
|
@ -111,6 +111,7 @@ class ExternalMergeSort implements FeatureSort {
|
|||
}
|
||||
|
||||
private DataInputStream newInputStream(Path path) throws IOException {
|
||||
@SuppressWarnings("java:S2095") // DataInputStream closes inputStream
|
||||
InputStream inputStream = new BufferedInputStream(Files.newInputStream(path), 50_000);
|
||||
if (gzip) {
|
||||
inputStream = new GZIPInputStream(inputStream);
|
||||
|
@ -119,6 +120,7 @@ class ExternalMergeSort implements FeatureSort {
|
|||
}
|
||||
|
||||
private DataOutputStream newOutputStream(Path path) throws IOException {
|
||||
@SuppressWarnings("java:S2095") // DataInputStream closes inputStream
|
||||
OutputStream outputStream = new BufferedOutputStream(Files.newOutputStream(path), 50_000);
|
||||
if (gzip) {
|
||||
outputStream = new FastGzipOutputStream(outputStream);
|
||||
|
|
|
@ -12,6 +12,7 @@ import com.onthegomap.planetiler.stats.Stats;
|
|||
import com.onthegomap.planetiler.util.CommonStringEncoder;
|
||||
import com.onthegomap.planetiler.util.DiskBacked;
|
||||
import com.onthegomap.planetiler.util.LayerStats;
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Path;
|
||||
import java.util.ArrayList;
|
||||
|
@ -132,15 +133,16 @@ public final class FeatureGroup implements Consumer<SortableFeature>, Iterable<F
|
|||
return sorter.numFeaturesWritten();
|
||||
}
|
||||
|
||||
/** Returns a function for a single thread to use to serialize rendered features. */
|
||||
public Function<RenderedFeature, SortableFeature> newRenderedFeatureEncoder() {
|
||||
// This method gets called billions of times when generating the planet, so these optimizations make a big difference:
|
||||
// 1) Re-use the same buffer packer to avoid allocating and resizing new byte arrays for every feature.
|
||||
var packer = MessagePack.newDefaultBufferPacker();
|
||||
// 2) Avoid a ThreadLocal lookup on every layer stats call by getting the handler for this thread once
|
||||
var threadLocalLayerStats = layerStats.handlerForThread();
|
||||
public interface RenderedFeatureEncoder extends Function<RenderedFeature, SortableFeature>, Closeable {}
|
||||
|
||||
return new Function<>() {
|
||||
/** Returns a function for a single thread to use to serialize rendered features. */
|
||||
public RenderedFeatureEncoder newRenderedFeatureEncoder() {
|
||||
return new RenderedFeatureEncoder() {
|
||||
// This method gets called billions of times when generating the planet, so these optimizations make a big difference:
|
||||
// 1) Re-use the same buffer packer to avoid allocating and resizing new byte arrays for every feature.
|
||||
private final MessageBufferPacker packer = MessagePack.newDefaultBufferPacker();
|
||||
// 2) Avoid a ThreadLocal lookup on every layer stats call by getting the handler for this thread once
|
||||
private final Consumer<RenderedFeature> threadLocalLayerStats = layerStats.handlerForThread();
|
||||
// 3) Avoid re-encoding values for identical filled geometries (i.e. ocean) by memoizing the encoded values
|
||||
// FeatureRenderer ensures that a separate VectorTileEncoder.Feature is used for each zoom level
|
||||
private VectorTile.Feature lastFeature = null;
|
||||
|
@ -163,6 +165,11 @@ public final class FeatureGroup implements Consumer<SortableFeature>, Iterable<F
|
|||
|
||||
return new SortableFeature(encodeKey(feature), encodedValue);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
packer.close();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
|
|
|
@ -58,18 +58,19 @@ public abstract class SimpleReader implements Closeable {
|
|||
.addBuffer("read_queue", 1000)
|
||||
.<SortableFeature>addWorker("process", threads, (prev, next) -> {
|
||||
var featureCollectors = new FeatureCollector.Factory(config, stats);
|
||||
FeatureRenderer renderer = newFeatureRenderer(writer, config, next);
|
||||
for (SourceFeature sourceFeature : prev) {
|
||||
featuresRead.incrementAndGet();
|
||||
FeatureCollector features = featureCollectors.get(sourceFeature);
|
||||
if (sourceFeature.latLonGeometry().getEnvelopeInternal().intersects(latLonBounds)) {
|
||||
try {
|
||||
profile.processFeature(sourceFeature, features);
|
||||
for (FeatureCollector.Feature renderable : features) {
|
||||
renderer.accept(renderable);
|
||||
try (FeatureRenderer renderer = newFeatureRenderer(writer, config, next)) {
|
||||
for (SourceFeature sourceFeature : prev) {
|
||||
featuresRead.incrementAndGet();
|
||||
FeatureCollector features = featureCollectors.get(sourceFeature);
|
||||
if (sourceFeature.latLonGeometry().getEnvelopeInternal().intersects(latLonBounds)) {
|
||||
try {
|
||||
profile.processFeature(sourceFeature, features);
|
||||
for (FeatureCollector.Feature renderable : features) {
|
||||
renderer.accept(renderable);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Error processing " + sourceFeature, e);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Error processing " + sourceFeature, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -77,7 +78,7 @@ public abstract class SimpleReader implements Closeable {
|
|||
// output large batches since each input may map to many tiny output features (i.e. slicing ocean tiles)
|
||||
// which turns enqueueing into the bottleneck
|
||||
.addBuffer("write_queue", 50_000, 1_000)
|
||||
.sinkToConsumer("write", 1, (item) -> {
|
||||
.sinkToConsumer("write", 1, item -> {
|
||||
featuresWritten.incrementAndGet();
|
||||
writer.accept(item);
|
||||
});
|
||||
|
@ -94,21 +95,21 @@ public abstract class SimpleReader implements Closeable {
|
|||
pipeline.awaitAndLog(loggers, config.logInterval());
|
||||
|
||||
// hook for profile to do any post-processing after this source is read
|
||||
profile.finish(sourceName,
|
||||
new FeatureCollector.Factory(config, stats),
|
||||
newFeatureRenderer(writer, config, writer)
|
||||
);
|
||||
try (var featureRenderer = newFeatureRenderer(writer, config, writer)) {
|
||||
profile.finish(sourceName, new FeatureCollector.Factory(config, stats), featureRenderer);
|
||||
}
|
||||
timer.stop();
|
||||
}
|
||||
|
||||
|
||||
private FeatureRenderer newFeatureRenderer(FeatureGroup writer, PlanetilerConfig config,
|
||||
Consumer<SortableFeature> next) {
|
||||
@SuppressWarnings("java:S2095") // closed by FeatureRenderer
|
||||
var encoder = writer.newRenderedFeatureEncoder();
|
||||
return new FeatureRenderer(
|
||||
config,
|
||||
rendered -> next.accept(encoder.apply(rendered)),
|
||||
stats
|
||||
stats,
|
||||
encoder
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -328,44 +328,44 @@ public class OsmReader implements Closeable, MemoryEstimator.HasEstimate {
|
|||
Counter blocks = blocksProcessed.counterForThread();
|
||||
Counter rels = relationsProcessed.counterForThread();
|
||||
|
||||
var phaser = pass2Phaser.forWorker();
|
||||
var featureCollectors = new FeatureCollector.Factory(config, stats);
|
||||
final NodeLocationProvider nodeLocations = newNodeLocationProvider();
|
||||
FeatureRenderer renderer = createFeatureRenderer(writer, config, next);
|
||||
var relationHandler = relationDistributor.forThread(relation -> {
|
||||
var feature = processRelationPass2(relation, nodeLocations);
|
||||
if (feature != null) {
|
||||
render(featureCollectors, renderer, relation, feature);
|
||||
}
|
||||
rels.inc();
|
||||
});
|
||||
|
||||
for (var block : prev) {
|
||||
for (var element : block.decodeElements()) {
|
||||
SourceFeature feature = null;
|
||||
if (element instanceof OsmElement.Node node) {
|
||||
phaser.arrive(OsmPhaser.Phase.NODES);
|
||||
feature = processNodePass2(node);
|
||||
} else if (element instanceof OsmElement.Way way) {
|
||||
phaser.arrive(OsmPhaser.Phase.WAYS);
|
||||
feature = processWayPass2(way, nodeLocations);
|
||||
} else if (element instanceof OsmElement.Relation relation) {
|
||||
phaser.arriveAndWaitForOthers(OsmPhaser.Phase.RELATIONS);
|
||||
relationHandler.accept(relation);
|
||||
}
|
||||
// render features specified by profile and hand them off to next step that will
|
||||
// write them intermediate storage
|
||||
try (var renderer = createFeatureRenderer(writer, config, next)) {
|
||||
var phaser = pass2Phaser.forWorker();
|
||||
var relationHandler = relationDistributor.forThread(relation -> {
|
||||
var feature = processRelationPass2(relation, nodeLocations);
|
||||
if (feature != null) {
|
||||
render(featureCollectors, renderer, element, feature);
|
||||
render(featureCollectors, renderer, relation, feature);
|
||||
}
|
||||
rels.inc();
|
||||
});
|
||||
for (var block : prev) {
|
||||
for (var element : block.decodeElements()) {
|
||||
SourceFeature feature = null;
|
||||
if (element instanceof OsmElement.Node node) {
|
||||
phaser.arrive(OsmPhaser.Phase.NODES);
|
||||
feature = processNodePass2(node);
|
||||
} else if (element instanceof OsmElement.Way way) {
|
||||
phaser.arrive(OsmPhaser.Phase.WAYS);
|
||||
feature = processWayPass2(way, nodeLocations);
|
||||
} else if (element instanceof OsmElement.Relation relation) {
|
||||
phaser.arriveAndWaitForOthers(OsmPhaser.Phase.RELATIONS);
|
||||
relationHandler.accept(relation);
|
||||
}
|
||||
// render features specified by profile and hand them off to next step that will
|
||||
// write them intermediate storage
|
||||
if (feature != null) {
|
||||
render(featureCollectors, renderer, element, feature);
|
||||
}
|
||||
}
|
||||
blocks.inc();
|
||||
}
|
||||
blocks.inc();
|
||||
|
||||
phaser.close();
|
||||
|
||||
// do work for other threads that are still processing blocks of relations
|
||||
relationHandler.close();
|
||||
}
|
||||
|
||||
phaser.close();
|
||||
|
||||
// do work for other threads that are still processing blocks of relations
|
||||
relationHandler.close();
|
||||
}).addBuffer("feature_queue", 50_000, 1_000)
|
||||
// FeatureGroup writes need to be single-threaded
|
||||
.sinkToConsumer("write", 1, writer);
|
||||
|
@ -392,10 +392,8 @@ public class OsmReader implements Closeable, MemoryEstimator.HasEstimate {
|
|||
|
||||
timer.stop();
|
||||
|
||||
try {
|
||||
profile.finish(name,
|
||||
new FeatureCollector.Factory(config, stats),
|
||||
createFeatureRenderer(writer, config, writer));
|
||||
try (var renderer = createFeatureRenderer(writer, config, writer)) {
|
||||
profile.finish(name, new FeatureCollector.Factory(config, stats), renderer);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Error calling profile.finish", e);
|
||||
}
|
||||
|
@ -468,11 +466,13 @@ public class OsmReader implements Closeable, MemoryEstimator.HasEstimate {
|
|||
|
||||
private FeatureRenderer createFeatureRenderer(FeatureGroup writer, PlanetilerConfig config,
|
||||
Consumer<SortableFeature> next) {
|
||||
@SuppressWarnings("java:S2095") // closed by FeatureRenderer
|
||||
var encoder = writer.newRenderedFeatureEncoder();
|
||||
return new FeatureRenderer(
|
||||
config,
|
||||
rendered -> next.accept(encoder.apply(rendered)),
|
||||
stats
|
||||
stats,
|
||||
encoder
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -9,6 +9,9 @@ import com.onthegomap.planetiler.geo.GeometryException;
|
|||
import com.onthegomap.planetiler.geo.TileCoord;
|
||||
import com.onthegomap.planetiler.geo.TileExtents;
|
||||
import com.onthegomap.planetiler.stats.Stats;
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.io.UncheckedIOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -35,7 +38,7 @@ import org.slf4j.LoggerFactory;
|
|||
* Converts source features geometries to encoded vector tile features according to settings configured in the map
|
||||
* profile (like zoom range, min pixel size, output attributes and their zoom ranges).
|
||||
*/
|
||||
public class FeatureRenderer implements Consumer<FeatureCollector.Feature> {
|
||||
public class FeatureRenderer implements Consumer<FeatureCollector.Feature>, Closeable {
|
||||
|
||||
// generate globally-unique IDs shared by all vector tile features representing the same source feature
|
||||
private static final AtomicLong idGenerator = new AtomicLong(0);
|
||||
|
@ -51,12 +54,19 @@ public class FeatureRenderer implements Consumer<FeatureCollector.Feature> {
|
|||
private final PlanetilerConfig config;
|
||||
private final Consumer<RenderedFeature> consumer;
|
||||
private final Stats stats;
|
||||
private final Closeable closeable;
|
||||
|
||||
/** Constructs a new feature render that will send rendered features to {@code consumer}. */
|
||||
public FeatureRenderer(PlanetilerConfig config, Consumer<RenderedFeature> consumer, Stats stats) {
|
||||
public FeatureRenderer(PlanetilerConfig config, Consumer<RenderedFeature> consumer, Stats stats,
|
||||
Closeable closeable) {
|
||||
this.config = config;
|
||||
this.consumer = consumer;
|
||||
this.stats = stats;
|
||||
this.closeable = closeable;
|
||||
}
|
||||
|
||||
public FeatureRenderer(PlanetilerConfig config, Consumer<RenderedFeature> consumer, Stats stats) {
|
||||
this(config, consumer, stats, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -66,7 +76,7 @@ public class FeatureRenderer implements Consumer<FeatureCollector.Feature> {
|
|||
|
||||
private void renderGeometry(Geometry geom, FeatureCollector.Feature feature) {
|
||||
if (geom.isEmpty()) {
|
||||
LOGGER.warn("Empty geometry " + feature);
|
||||
LOGGER.warn("Empty geometry {}", feature);
|
||||
} else if (geom instanceof Point point) {
|
||||
renderPoint(feature, point.getCoordinates());
|
||||
} else if (geom instanceof MultiPoint points) {
|
||||
|
@ -79,8 +89,8 @@ public class FeatureRenderer implements Consumer<FeatureCollector.Feature> {
|
|||
renderGeometry(collection.getGeometryN(i), feature);
|
||||
}
|
||||
} else {
|
||||
LOGGER.warn(
|
||||
"Unrecognized JTS geometry type for " + feature.getClass().getSimpleName() + ": " + geom.getGeometryType());
|
||||
LOGGER.warn("Unrecognized JTS geometry type for {}: {}", feature.getClass().getSimpleName(),
|
||||
geom.getGeometryType());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -269,4 +279,15 @@ public class FeatureRenderer implements Consumer<FeatureCollector.Feature> {
|
|||
}
|
||||
return emitted;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (closeable != null) {
|
||||
try {
|
||||
closeable.close();
|
||||
} catch (IOException e) {
|
||||
throw new UncheckedIOException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,13 +1,15 @@
|
|||
sonar.issue.ignore.multicriteria=js1659,js3358,js1172,js106,js125,js2699
|
||||
sonar.issue.ignore.multicriteria=js1659,js3358,js1172,js106,js125,js2699,js3776
|
||||
# subjective
|
||||
sonar.issue.ignore.multicriteria.js1659.ruleKey=java:S1659
|
||||
sonar.issue.ignore.multicriteria.js1659.resourceKey=**/*.java
|
||||
sonar.issue.ignore.multicriteria.js3358.ruleKey=java:S3358
|
||||
sonar.issue.ignore.multicriteria.js3358.resourceKey=**/*.java
|
||||
sonar.issue.ignore.multicriteria.jsS106.ruleKey=java:S106
|
||||
sonar.issue.ignore.multicriteria.jsS106.resourceKey=**/*.java
|
||||
sonar.issue.ignore.multicriteria.jsS125.ruleKey=java:S125
|
||||
sonar.issue.ignore.multicriteria.jsS125.resourceKey=**/*.java
|
||||
sonar.issue.ignore.multicriteria.js106.ruleKey=java:S106
|
||||
sonar.issue.ignore.multicriteria.js106.resourceKey=**/*.java
|
||||
sonar.issue.ignore.multicriteria.js125.ruleKey=java:S125
|
||||
sonar.issue.ignore.multicriteria.js125.resourceKey=**/*.java
|
||||
sonar.issue.ignore.multicriteria.js3776.ruleKey=java:S3776
|
||||
sonar.issue.ignore.multicriteria.js3776.resourceKey=**/*.java
|
||||
|
||||
# layer constructors need same signatures
|
||||
sonar.issue.ignore.multicriteria.js1172.ruleKey=java:S1172
|
||||
|
|
Ładowanie…
Reference in New Issue