Skip to content

Commit

Permalink
Merge pull request #382 from simonpoole/update
Browse files Browse the repository at this point in the history
Improvements to updating code
  • Loading branch information
lonvia authored Feb 19, 2019
2 parents 4d77277 + 35ae356 commit 7812e50
Show file tree
Hide file tree
Showing 3 changed files with 173 additions and 51 deletions.
4 changes: 4 additions & 0 deletions src/main/java/de/komoot/photon/elasticsearch/Updater.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ public void delete(Long id) {
}

private void updateDocuments() {
if (this.bulkRequest.numberOfActions() == 0) {
log.warn("Update empty");
return;
}
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
if (bulkResponse.hasFailures()) {
log.error("error while bulk update: " + bulkResponse.buildFailureMessage());
Expand Down
36 changes: 28 additions & 8 deletions src/main/java/de/komoot/photon/nominatim/NominatimConnector.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ boolean isUsefulForIndex() {
}

List<PhotonDoc> getDocsWithHousenumber() {
if (housenumbers == null || housenumbers.isEmpty())
if (housenumbers == null || housenumbers.isEmpty()) {
return ImmutableList.of(doc);
}

List<PhotonDoc> results = new ArrayList<PhotonDoc>(housenumbers.size());
for (Map.Entry<String, Point> e : housenumbers.entrySet()) {
Expand Down Expand Up @@ -205,6 +206,8 @@ public NominatimResult mapRow(ResultSet rs, int rowNum) throws SQLException {
}
};
private final String selectColsPlaceX = "place_id, osm_type, osm_id, class, type, name, housenumber, postcode, extratags, ST_Envelope(geometry) AS bbox, parent_place_id, linked_place_id, rank_search, importance, country_code, centroid";
private final String selectColsOsmline = "place_id, osm_id, parent_place_id, startnumber, endnumber, interpolationtype, postcode, country_code, linegeo";
private final String selectColsAddress = "p.place_id, p.osm_type, p.osm_id, p.name, p.class, p.type, p.rank_address, p.admin_level, p.postcode, p.extratags->'place' as place";
private Importer importer;

private Map<String, String> getCountryNames(String countrycode) {
Expand Down Expand Up @@ -246,15 +249,20 @@ public void setImporter(Importer importer) {
this.importer = importer;
}

public PhotonDoc getByPlaceId(long placeId) {
return template.queryForObject("SELECT " + selectColsPlaceX + " FROM placex WHERE place_id = ?", new Object[]{placeId}, placeRowMapper).getBaseDoc();
public List<PhotonDoc> getByPlaceId(long placeId) {
NominatimResult result = template.queryForObject("SELECT " + selectColsPlaceX + " FROM placex WHERE place_id = ?", new Object[] { placeId }, placeRowMapper);
completePlace(result.getBaseDoc());
return result.getDocsWithHousenumber();
}

public List<PhotonDoc> getInterpolationsByPlaceId(long placeId) {
NominatimResult result = template.queryForObject("SELECT " + selectColsOsmline + " FROM location_property_osmline WHERE place_id = ?", new Object[] { placeId }, osmlineRowMapper);
completePlace(result.getBaseDoc());
return result.getDocsWithHousenumber();
}

List<AddressRow> getAddresses(PhotonDoc doc) {
long placeId = doc.getPlaceId();
if (doc.getRankSearch() > 28)
placeId = doc.getParentPlaceId();
return template.query("SELECT p.place_id, p.osm_type, p.osm_id, p.name, p.class, p.type, p.rank_address, p.admin_level, p.postcode, p.extratags->'place' as place FROM placex p, place_addressline pa WHERE p.place_id = pa.address_place_id and pa.place_id = ? and pa.cached_rank_address > 4 and pa.address_place_id != ? and pa.isaddress order by rank_address desc,fromarea desc,distance asc,rank_search desc", new Object[]{placeId, doc.getPlaceId()}, new RowMapper<AddressRow>() {
RowMapper<AddressRow> rowMapper = new RowMapper<AddressRow>() {
@Override
public AddressRow mapRow(ResultSet rs, int rowNum) throws SQLException {
Integer adminLevel = rs.getInt("admin_level");
Expand All @@ -274,7 +282,19 @@ public AddressRow mapRow(ResultSet rs, int rowNum) throws SQLException {
rs.getLong("osm_id")
);
}
});
};

boolean isPoi = doc.getRankSearch() > 28;
long placeId = (isPoi) ? doc.getParentPlaceId() : doc.getPlaceId();

List<AddressRow> terms = template.query("SELECT " + selectColsAddress + " FROM placex p, place_addressline pa WHERE p.place_id = pa.address_place_id and pa.place_id = ? and pa.cached_rank_address > 4 and pa.address_place_id != ? and pa.isaddress order by rank_address desc,fromarea desc,distance asc,rank_search desc", new Object[]{placeId, placeId}, rowMapper);

if (isPoi) {
// need to add the term for the parent place ID itself
terms.addAll(0, template.query("SELECT " + selectColsAddress + " FROM placex p WHERE p.place_id = ?", new Object[]{placeId}, rowMapper));
}

return terms;
}

private static final PhotonDoc FINAL_DOCUMENT = new PhotonDoc(0, null, 0, null, null, null, null, null, null, 0, 0, null, null, 0, 0);
Expand Down
184 changes: 141 additions & 43 deletions src/main/java/de/komoot/photon/nominatim/NominatimUpdater.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;

/**
* Nominatim update logic
Expand All @@ -21,69 +22,166 @@

public class NominatimUpdater {
private static final org.slf4j.Logger LOGGER = org.slf4j.LoggerFactory.getLogger(NominatimUpdater.class);
private final Integer minRank = 1;
private final Integer maxRank = 30;
private final JdbcTemplate template;

private static final int CREATE = 1;
private static final int UPDATE = 2;
private static final int DELETE = 100;

private static final int MIN_RANK = 1;
private static final int MAX_RANK = 30;

private final JdbcTemplate template;
private final NominatimConnector exporter;

private Updater updater;

/**
* when updating lockout other threads
*/
private ReentrantLock updateLock = new ReentrantLock();

public void setUpdater(Updater updater) {
this.updater = updater;
}

public void update() {
for (Integer rank = this.minRank; rank <= this.maxRank; rank++) {
LOGGER.info(String.format("Starting rank %d", rank));
for (Map<String, Object> sector : getIndexSectors(rank))
for (UpdateRow place : getIndexSectorPlaces(rank, (Integer) sector.get("geometry_sector"))) {

template.update("update placex set indexed_status = 0 where place_id = ?", place.getPlaceId());
final PhotonDoc updatedDoc = exporter.getByPlaceId(place.getPlaceId());

switch (place.getIndexdStatus()) {
case 1:
if (updatedDoc.isUsefulForIndex())
updater.create(updatedDoc);
break;
case 2:
if (!updatedDoc.isUsefulForIndex())
updater.delete(place.getPlaceId());

updater.updateOrCreate(updatedDoc);
break;
case 100:
updater.delete(place.getPlaceId());
break;
default:
LOGGER.error(String.format("Unknown index status %d", place.getIndexdStatus()));
break;
if (updateLock.tryLock()) {
try {
int updatedPlaces = 0;
int deletedPlaces = 0;
for (int rank = MIN_RANK; rank <= MAX_RANK; rank++) {
LOGGER.info(String.format("Starting rank %d", rank));
for (Map<String, Object> sector : getIndexSectors(rank)) {
for (UpdateRow place : getIndexSectorPlaces(rank, (Integer) sector.get("geometry_sector"))) {
long placeId = place.getPlaceId();
template.update("update placex set indexed_status = 0 where place_id = ?;", placeId);

Integer indexedStatus = place.getIndexdStatus();
if (indexedStatus == DELETE || (indexedStatus == UPDATE && rank == MAX_RANK)) {
updater.delete(placeId);
if (indexedStatus == DELETE) {
deletedPlaces++;
continue;
}
indexedStatus = CREATE; // always create
}
updatedPlaces++;

final List<PhotonDoc> updatedDocs = exporter.getByPlaceId(place.getPlaceId());
boolean wasUseful = false;
for (PhotonDoc updatedDoc : updatedDocs) {
switch (indexedStatus) {
case CREATE:
if (updatedDoc.isUsefulForIndex()) {
updater.create(updatedDoc);
}
break;
case UPDATE:
if (updatedDoc.isUsefulForIndex()) {
updater.updateOrCreate(updatedDoc);
wasUseful = true;
}
break;
default:
LOGGER.error(String.format("Unknown index status %d", indexedStatus));
break;
}
}
if (indexedStatus == UPDATE && !wasUseful) {
// only true when rank != 30
// if no documents for the place id exist this will likely cause moaning
updater.delete(placeId);
updatedPlaces--;
}
}
}
}
}

updater.finish();
LOGGER.info(String.format("%d places created or updated, %d deleted", updatedPlaces, deletedPlaces));

// update documents generated from address interpolations
// .isUsefulForIndex() should always return true for documents
// created from interpolations so no need to check them
LOGGER.info("Starting interpolations");
int updatedInterpolations = 0;
int deletedInterpolations = 0;
int interpolationDocuments = 0;
for (Map<String, Object> sector : template.queryForList(
"select geometry_sector,count(*) from location_property_osmline where indexed_status > 0 group by geometry_sector order by geometry_sector;")) {
for (UpdateRow place : getIndexSectorInterpolations((Integer) sector.get("geometry_sector"))) {
long placeId = place.getPlaceId();
template.update("update location_property_osmline set indexed_status = 0 where place_id = ?;", placeId);

Integer indexedStatus = place.getIndexdStatus();
if (indexedStatus != CREATE) {
updater.delete(placeId);
if (indexedStatus == DELETE) {
deletedInterpolations++;
continue;
}
}
updatedInterpolations++;

final List<PhotonDoc> updatedDocs = exporter.getInterpolationsByPlaceId(place.getPlaceId());
for (PhotonDoc updatedDoc : updatedDocs) {
updater.create(updatedDoc);
interpolationDocuments++;
}
}
}
LOGGER.info(String.format("%d interpolations created or updated, %d deleted, %d documents added or updated", updatedInterpolations,
deletedInterpolations, interpolationDocuments));
updater.finish();
template.update("update import_status set indexed=true;"); // indicate that we are finished

LOGGER.info("Finished updating");
} finally {
updateLock.unlock();
}
} else {
LOGGER.info("Update already in progress");
}
}

private List<Map<String, Object>> getIndexSectors(Integer rank) {
return template.queryForList("select geometry_sector,count(*) from placex where rank_search = ? " +
"and indexed_status > 0 group by geometry_sector order by geometry_sector;", rank);
return template.queryForList("select geometry_sector,count(*) from placex where rank_search = ? "
+ "and indexed_status > 0 group by geometry_sector order by geometry_sector;", rank);
}

private List<UpdateRow> getIndexSectorPlaces(Integer rank, Integer geometrySector) {
return template.query("select place_id, indexed_status from placex where rank_search = ?" +
" and geometry_sector = ? and indexed_status > 0;", new Object[]{rank, geometrySector}, new RowMapper<UpdateRow>() {
@Override
public UpdateRow mapRow(ResultSet rs, int rowNum) throws SQLException {
UpdateRow updateRow = new UpdateRow();
updateRow.setPlaceId(rs.getLong("place_id"));
updateRow.setIndexdStatus(rs.getInt("indexed_status"));
return updateRow;
}
});
return template.query("select place_id, indexed_status from placex where rank_search = ?" + " and geometry_sector = ? and indexed_status > 0;",
new Object[] { rank, geometrySector }, new RowMapper<UpdateRow>() {
@Override
public UpdateRow mapRow(ResultSet rs, int rowNum) throws SQLException {
UpdateRow updateRow = new UpdateRow();
updateRow.setPlaceId(rs.getLong("place_id"));
updateRow.setIndexdStatus(rs.getInt("indexed_status"));
return updateRow;
}
});
}

private List<UpdateRow> getIndexSectorInterpolations(Integer geometrySector) {
return template.query("select place_id, indexed_status from location_property_osmline where geometry_sector = ? and indexed_status > 0;",
new Object[] { geometrySector }, new RowMapper<UpdateRow>() {
@Override
public UpdateRow mapRow(ResultSet rs, int rowNum) throws SQLException {
UpdateRow updateRow = new UpdateRow();
updateRow.setPlaceId(rs.getLong("place_id"));
updateRow.setIndexdStatus(rs.getInt("indexed_status"));
return updateRow;
}
});
}

/**
* Creates a new instance
*
* @param host Nominatim database host
* @param port Nominatim database port
* @param database Nominatim database name
* @param username Nominatim database username
* @param password Nominatim database password
*/
public NominatimUpdater(String host, int port, String database, String username, String password) {
BasicDataSource dataSource = new BasicDataSource();
Expand All @@ -92,7 +190,7 @@ public NominatimUpdater(String host, int port, String database, String username,
dataSource.setUsername(username);
dataSource.setPassword(password);
dataSource.setDriverClassName(JtsWrapper.class.getCanonicalName());
dataSource.setDefaultAutoCommit(false);
dataSource.setDefaultAutoCommit(true);

exporter = new NominatimConnector(host, port, database, username, password);
template = new JdbcTemplate(dataSource);
Expand Down

0 comments on commit 7812e50

Please sign in to comment.