Skip to content

Commit

Permalink
Refactor results writers to support dual access grids
Browse files Browse the repository at this point in the history
The `MultiGridResultWriter` and `GridResultWriter` can be reused for dual access grids with minor changes.

Regular accessibility grids use time cutoffs, dual accessibility grids use thresholds. This change mainly moves up the `channels` parameter to support cutoffs and thresholds (channels, cutoffs, thresholds...naming here could be confusing).

This change additionally does some minor refactoring around passing the file name to `BaseResultWriter`'s constructor so that metadata doesn't need to be passed and stored until `finish()`.
  • Loading branch information
trevorgerhardt committed Feb 17, 2025
1 parent 6f35697 commit 28cb70a
Show file tree
Hide file tree
Showing 9 changed files with 72 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,7 @@
public class AccessCsvResultWriter extends CsvResultWriter {

public AccessCsvResultWriter (RegionalTask task, FileStorage fileStorage) throws IOException {
super(task, fileStorage);
}

@Override
public CsvResultType resultType () {
return CsvResultType.ACCESS;
super(task, CsvResultType.ACCESS, fileStorage);
}

@Override
Expand Down
21 changes: 5 additions & 16 deletions src/main/java/com/conveyal/analysis/results/BaseResultWriter.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.conveyal.analysis.results;

import com.conveyal.file.FileCategory;
import com.conveyal.file.FileStorage;
import com.conveyal.file.FileStorageKey;
import com.conveyal.file.FileUtils;
Expand Down Expand Up @@ -31,30 +30,20 @@ public abstract class BaseResultWriter {

private static final Logger LOG = LoggerFactory.getLogger(BaseResultWriter.class);

public final String fileName;
private final FileStorage fileStorage;

protected File bufferFile;
protected File bufferFile = FileUtils.createScratchFile();

public BaseResultWriter (FileStorage fileStorage) {
public BaseResultWriter(String fileName, FileStorage fileStorage) {
this.fileName = fileName;
this.fileStorage = fileStorage;
}

// Can this be merged into the constructor?
protected void prepare (String jobId) {
try {
bufferFile = File.createTempFile(jobId + "_", ".results");
// On unexpected server shutdown, these files should be deleted.
// We could attempt to recover from shutdowns but that will take a lot of changes and persisted data.
bufferFile.deleteOnExit();
} catch (IOException e) {
LOG.error("Exception while creating buffer file for multi-origin assembler: " + e.toString());
}
}

/**
* Gzip the access grid and store it.
*/
protected synchronized void finish (String fileName) throws IOException {
protected synchronized void finish() throws IOException {
LOG.info("Compressing {} and moving into file storage.", fileName);
FileStorageKey fileStorageKey = new FileStorageKey(RESULTS, fileName);
File gzippedResultFile = FileUtils.createScratchFile();
Expand Down
15 changes: 6 additions & 9 deletions src/main/java/com/conveyal/analysis/results/CsvResultWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import java.io.IOException;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;

/**
Expand All @@ -25,7 +24,6 @@ public abstract class CsvResultWriter extends BaseResultWriter implements Region

private static final Logger LOG = LoggerFactory.getLogger(CsvResultWriter.class);

public final String fileName;
private final CsvWriter csvWriter;
private int nDataColumns;

Expand All @@ -39,7 +37,7 @@ public abstract class CsvResultWriter extends BaseResultWriter implements Region
* Override to return an Enum (usable as String) identifying the kind of results recorded by this CSV writer.
* This serves as a filename suffix to distinguish between different CSVs generated by a single regional analysis.
*/
public abstract CsvResultType resultType ();
public final CsvResultType resultType;

/**
* Override to provide column names for this CSV writer.
Expand All @@ -60,16 +58,15 @@ public abstract class CsvResultWriter extends BaseResultWriter implements Region
* "origin", "destination", and the supplied indicator.
* FIXME it's strange we're manually passing injectable components into objects not wired up at application construction.
*/
CsvResultWriter (RegionalTask task, FileStorage fileStorage) throws IOException {
super(fileStorage);
CsvResultWriter(RegionalTask task, CsvResultType resultType, FileStorage fileStorage) throws IOException {
super(task.jobId + "_" + resultType + ".csv", fileStorage);
checkArgument(task.originPointSet != null, "CsvResultWriters require FreeFormPointSet origins.");
super.prepare(task.jobId);
this.fileName = task.jobId + "_" + resultType() +".csv";
BufferedWriter bufferedWriter = new BufferedWriter(new FileWriter(bufferFile));
csvWriter = new CsvWriter(bufferedWriter, ',');
setDataColumns(columnHeaders());
this.resultType = resultType;
this.task = task;
LOG.info("Created CSV file to hold {} results for regional job {}", resultType(), task.jobId);
LOG.info("Created CSV file to hold {} results for regional job {}", resultType, task.jobId);
}

/**
Expand All @@ -89,7 +86,7 @@ protected void setDataColumns(String... columns) throws IOException {
@Override
public synchronized void finish () throws IOException {
csvWriter.close();
super.finish(this.fileName);
super.finish();
}

/**
Expand Down
49 changes: 24 additions & 25 deletions src/main/java/com/conveyal/analysis/results/GridResultWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import com.conveyal.file.FileStorage;
import com.conveyal.r5.analyst.LittleEndianIntOutputStream;
import com.conveyal.r5.analyst.cluster.RegionalTask;
import com.conveyal.r5.analyst.WebMercatorExtents;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -41,7 +41,7 @@ public class GridResultWriter extends BaseResultWriter {

private static final Logger LOG = LoggerFactory.getLogger(GridResultWriter.class);

private RandomAccessFile randomAccessFile;
private final RandomAccessFile randomAccessFile;

/** The version of the access grids we produce */
private static final int ACCESS_GRID_VERSION = 0;
Expand All @@ -50,44 +50,43 @@ public class GridResultWriter extends BaseResultWriter {
private static final long HEADER_LENGTH_BYTES = 9 * Integer.BYTES;

/**
* The number of different travel time cutoffs being applied when computing accessibility for each origin.
* The number of different travel time cutoffs or dual access thresholds being applied when computing values for
* each origin.
* The number of values stored per origin cell in an accessibility results grid.
* Note that we're storing only the number of different cutoffs, but not the cutoff values themselves in the file.
* This means that the files can only be properly interpreted with the Mongo metadata from the regional analysis.
* Note that we're storing only the number of different channels, but not the values themselves in the file.
* This means that the files can only be properly interpreted with the metadata from the regional analysis.
* This is an intentional choice to avoid changing the file format, and in any case these files are not expected
* to ever be used separately from an environment where the Mongo database is available.
* to ever be used separately from an environment where the metadata is available.
*/
private final int channels;

/**
* Construct an writer for a single regional analysis result grid, using the proprietary
* Construct a writer for a single regional analysis result grid, using the proprietary
* Conveyal grid format. This also creates the on-disk scratch buffer into which the results
* from the workers will be accumulated.
*/
GridResultWriter (RegionalTask task, FileStorage fileStorage) {
super(fileStorage);
int width = task.width;
int height = task.height;
this.channels = task.cutoffsMinutes.length;
GridResultWriter(WebMercatorExtents ext, int channels, String fileName, FileStorage fileStorage) {
super(fileName, fileStorage);
long bodyBytes = (long) ext.width * ext.height * channels * Integer.BYTES;
this.channels = channels;
LOG.info(
"Expecting multi-origin results for grid with width {}, height {}, {} values per origin.",
width,
height,
ext.width,
ext.height,
channels
);
super.prepare(task.jobId);

try {
// Write the access grid file header to the temporary file.
FileOutputStream fos = new FileOutputStream(bufferFile);
LittleEndianIntOutputStream data = new LittleEndianIntOutputStream(fos);
data.writeAscii("ACCESSGR");
data.writeInt(ACCESS_GRID_VERSION);
data.writeInt(task.zoom);
data.writeInt(task.west);
data.writeInt(task.north);
data.writeInt(width);
data.writeInt(height);
data.writeInt(ext.zoom);
data.writeInt(ext.west);
data.writeInt(ext.north);
data.writeInt(ext.width);
data.writeInt(ext.height);
data.writeInt(channels);
data.close();

Expand All @@ -100,8 +99,8 @@ public class GridResultWriter extends BaseResultWriter {
// IO limits on cloud servers with network storage. Even without initialization, any complete regional analysis
// would overwrite every byte in the file with a result for some origin point, so the initial values are only
// important when visualizing or debugging partially completed analysis results.
this.randomAccessFile = new RandomAccessFile(bufferFile, "rw");
randomAccessFile.setLength(HEADER_LENGTH_BYTES + (width * height * channels * Integer.BYTES));
randomAccessFile = new RandomAccessFile(bufferFile, "rw");
randomAccessFile.setLength(HEADER_LENGTH_BYTES + bodyBytes);
LOG.info(
"Created temporary file to accumulate results from workers, size is {}.",
human(randomAccessFile.length(), "B")
Expand All @@ -113,8 +112,8 @@ public class GridResultWriter extends BaseResultWriter {

/** Gzip the access grid and upload it to file storage (such as AWS S3). */
@Override
protected synchronized void finish (String fileName) throws IOException {
super.finish(fileName);
protected synchronized void finish() throws IOException {
super.finish();
randomAccessFile.close();
}

Expand All @@ -138,7 +137,7 @@ synchronized void writeOneOrigin (int taskNumber, int[] values) throws IOExcepti
if (values.length != channels) {
throw new IllegalArgumentException("Number of channels to be written does not match this writer.");
}
long offset = HEADER_LENGTH_BYTES + (taskNumber * channels * Integer.BYTES);
long offset = HEADER_LENGTH_BYTES + ((long) taskNumber * channels * Integer.BYTES);
// RandomAccessFile is not threadsafe and multiple threads may call this, so synchronize.
// TODO why is the method also synchronized then?
synchronized (this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.conveyal.analysis.models.RegionalAnalysis;
import com.conveyal.file.FileStorage;
import com.conveyal.r5.analyst.WebMercatorExtents;
import com.conveyal.r5.analyst.cluster.RegionalTask;
import com.conveyal.r5.analyst.cluster.RegionalWorkResult;

Expand All @@ -10,37 +11,34 @@
* same interface as our CSV writers, so CSV and Grids can be processed similarly in MultiOriginAssembler.
*/
public class MultiGridResultWriter implements RegionalResultWriter {

private final RegionalAnalysis regionalAnalysis;

private final RegionalTask task;

/**
* We create one GridResultWriter for each destination pointset and percentile.
* Each of those output files contains data for all travel time cutoffs at each origin.
* Each of those output files contains data for all channels at each origin.
*/
private final GridResultWriter[][] accessibilityGridWriters;

/** The number of different percentiles for which we're calculating accessibility on the workers. */
private final int nPercentiles;

/** The number of destination pointsets to which we're calculating accessibility */
private final int nDestinationPointSets;
private final GridResultWriter[][] gridResultWriters;

/** Constructor */
public MultiGridResultWriter (
RegionalAnalysis regionalAnalysis, RegionalTask task, FileStorage fileStorage
RegionalAnalysis regionalAnalysis, RegionalTask task, int channels, FileStorage fileStorage
) {
// We are storing the regional analysis just to get its pointset IDs (not keys) and its own ID.
this.regionalAnalysis = regionalAnalysis;
this.task = task;
this.nPercentiles = task.percentiles.length;
this.nDestinationPointSets = task.makeTauiSite ? 0 : task.destinationPointSetKeys.length;
int nPercentiles = task.percentiles.length;
int nDestinationPointSets = task.makeTauiSite ? 0 : task.destinationPointSetKeys.length;
// Create one grid writer per percentile and destination pointset.
accessibilityGridWriters = new GridResultWriter[nDestinationPointSets][nPercentiles];
gridResultWriters = new GridResultWriter[nDestinationPointSets][nPercentiles];
for (int d = 0; d < nDestinationPointSets; d++) {
for (int p = 0; p < nPercentiles; p++) {
accessibilityGridWriters[d][p] = new GridResultWriter(task, fileStorage);
String fileName = String.format(
"%s_%s_P%d.access",
regionalAnalysis._id,
regionalAnalysis.destinationPointSetIds[d],
task.percentiles[p]
);
gridResultWriters[d][p] = new GridResultWriter(
WebMercatorExtents.forTask(task),
channels,
fileName,
fileStorage
);
}
}
}
Expand All @@ -51,20 +49,18 @@ public void writeOneWorkResult (RegionalWorkResult workResult) throws Exception
// TODO more efficient way to write little-endian integers
// TODO check monotonic increasing invariants here rather than in worker.
// Infer x and y cell indexes based on the template task
int taskNumber = workResult.taskId;
for (int d = 0; d < workResult.accessibilityValues.length; d++) {
int[][] percentilesForGrid = workResult.accessibilityValues[d];
for (int p = 0; p < nPercentiles; p++) {
int[] cutoffsForPercentile = percentilesForGrid[p];
GridResultWriter gridWriter = accessibilityGridWriters[d][p];
gridWriter.writeOneOrigin(taskNumber, cutoffsForPercentile);
for (int p = 0; p < percentilesForGrid.length; p++) {
GridResultWriter gridWriter = gridResultWriters[d][p];
gridWriter.writeOneOrigin(workResult.taskId, percentilesForGrid[p]);
}
}
}

@Override
public void terminate () throws Exception {
for (GridResultWriter[] writers : accessibilityGridWriters) {
for (GridResultWriter[] writers : gridResultWriters) {
for (GridResultWriter writer : writers) {
writer.terminate();
}
Expand All @@ -73,14 +69,9 @@ public void terminate () throws Exception {

@Override
public void finish () throws Exception {
for (int d = 0; d < nDestinationPointSets; d++) {
for (int p = 0; p < nPercentiles; p++) {
int percentile = task.percentiles[p];
String destinationPointSetId = regionalAnalysis.destinationPointSetIds[d];
// TODO verify that regionalAnalysis._id is the same as job.jobId
String gridFileName =
String.format("%s_%s_P%d.access", regionalAnalysis._id, destinationPointSetId, percentile);
accessibilityGridWriters[d][p].finish(gridFileName);
for (GridResultWriter[] gridResultWriterRow : gridResultWriters) {
for (GridResultWriter resultWriter : gridResultWriterRow) {
resultWriter.finish();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import com.conveyal.r5.analyst.cluster.PathResult;
import com.conveyal.r5.analyst.cluster.RegionalTask;
import com.conveyal.r5.analyst.cluster.RegionalWorkResult;
import com.conveyal.r5.util.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -121,7 +120,12 @@ public MultiOriginAssembler (RegionalAnalysis regionalAnalysis, Job job, FileSto
resultWriters.add(new AccessCsvResultWriter(job.templateTask, fileStorage));
} else {
// Gridded origins - create gridded regional analysis results
resultWriters.add(new MultiGridResultWriter(regionalAnalysis, job.templateTask, fileStorage));
resultWriters.add(new MultiGridResultWriter(
regionalAnalysis,
job.templateTask,
job.templateTask.cutoffsMinutes.length,
fileStorage
));
}
}

Expand Down Expand Up @@ -157,7 +161,7 @@ public MultiOriginAssembler (RegionalAnalysis regionalAnalysis, Job job, FileSto
// FIXME instanceof+cast is ugly, do this some other way or even record the Grids
if (writer instanceof CsvResultWriter) {
CsvResultWriter csvWriter = (CsvResultWriter) writer;
regionalAnalysis.resultStorage.put(csvWriter.resultType(), csvWriter.fileName);
regionalAnalysis.resultStorage.put(csvWriter.resultType, csvWriter.fileName);
}
}
} catch (AnalysisServerException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,7 @@
public class PathCsvResultWriter extends CsvResultWriter {

public PathCsvResultWriter (RegionalTask task, FileStorage fileStorage) throws IOException {
super(task, fileStorage);
}

@Override
public CsvResultType resultType () {
return CsvResultType.PATHS;
super(task, CsvResultType.PATHS, fileStorage);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,10 @@ public class TemporalDensityCsvResultWriter extends CsvResultWriter {
private final int dualThreshold;

public TemporalDensityCsvResultWriter(RegionalTask task, FileStorage fileStorage) throws IOException {
super(task, fileStorage);
super(task, CsvResultType.TDENSITY, fileStorage);
dualThreshold = task.dualAccessibilityThreshold;
}

@Override
public CsvResultType resultType () {
return CsvResultType.TDENSITY;
}

@Override
public String[] columnHeaders () {
List<String> headers = new ArrayList<>();
Expand Down
Loading

0 comments on commit 28cb70a

Please sign in to comment.