Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Initial commit for exposing collection re-index to non sysadmin users.
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelarusso committed Oct 5, 2017
1 parent ac2337c commit 1ca08f3
Show file tree
Hide file tree
Showing 4 changed files with 199 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package org.apache.usergrid.corepersistence.index;


import org.apache.usergrid.utils.StringUtils;

/**
* An interface for re-indexing all entities in an application
*/
Expand Down Expand Up @@ -47,6 +49,13 @@ public interface ReIndexService {
*/
ReIndexStatus getStatus( final String jobId );

/**
* Get the status of a collection job
* @param collectionName The collectionName for the rebuild index
* @return
*/
ReIndexStatus getStatusForCollection( final String appIdString, final String collectionName );


/**
* The response when requesting a re-index operation
Expand All @@ -56,14 +65,27 @@ public class ReIndexStatus {
final Status status;
final long numberProcessed;
final long lastUpdated;
final String collectionName;


public ReIndexStatus( final String jobId, final Status status, final long numberProcessed,
final long lastUpdated ) {
this.jobId = jobId;
final long lastUpdated, final String collectionName ) {

if(StringUtils.isNotEmpty(jobId)){
this.jobId = jobId;
}else {
this.jobId = "";
}

this.status = status;
this.numberProcessed = numberProcessed;
this.lastUpdated = lastUpdated;

if(StringUtils.isNotEmpty(collectionName)){
this.collectionName = collectionName;
}else {
this.collectionName = "";
}
}


Expand All @@ -74,6 +96,13 @@ public String getJobId() {
return jobId;
}

/**
* Get the jobId used to resume this operation
*/
public String getCollectionName() {
return collectionName;
}


/**
* Get the last updated time, as a long
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public class ReIndexServiceImpl implements ReIndexService {
private static final String MAP_COUNT_KEY = "count";
private static final String MAP_STATUS_KEY = "status";
private static final String MAP_UPDATED_KEY = "lastUpdated";
private static final String MAP_SEPARATOR = "|||";


private final AllApplicationsObservable allApplicationsObservable;
Expand Down Expand Up @@ -140,7 +141,9 @@ public ReIndexStatus rebuildIndex( final ReIndexRequestBuilder reIndexRequestBui

// create an observable that loads a batch to be indexed

if(reIndexRequestBuilder.getCollectionName().isPresent()) {
final boolean isForCollection = reIndexRequestBuilder.getCollectionName().isPresent();

if(isForCollection) {

String collectionName = InflectionUtils.pluralize(
CpNamingUtils.getNameFromEdgeType(reIndexRequestBuilder.getCollectionName().get() ));
Expand Down Expand Up @@ -175,12 +178,36 @@ public ReIndexStatus rebuildIndex( final ReIndexRequestBuilder reIndexRequestBui
if( edgeScopes.size() > 0 ) {
writeCursorState(jobId, edgeScopes.get(edgeScopes.size() - 1));
}
writeStateMeta( jobId, Status.INPROGRESS, count.get(), System.currentTimeMillis() ); })
.doOnCompleted(() -> writeStateMeta( jobId, Status.COMPLETE, count.get(), System.currentTimeMillis() ))
if( isForCollection ){
writeStateMetaForCollection(
appId.get().getApplication().getUuid().toString(),
reIndexRequestBuilder.getCollectionName().get(),
Status.INPROGRESS, count.get(),
System.currentTimeMillis() );
}else{
writeStateMeta( jobId, Status.INPROGRESS, count.get(), System.currentTimeMillis() );
}
})
.doOnCompleted(() ->{
if( isForCollection ){
writeStateMetaForCollection(
appId.get().getApplication().getUuid().toString(),
reIndexRequestBuilder.getCollectionName().get(),
Status.COMPLETE, count.get(),
System.currentTimeMillis() );
}else {
writeStateMeta(jobId, Status.COMPLETE, count.get(), System.currentTimeMillis());
}
})
.subscribeOn( Schedulers.io() ).subscribe();

if(isForCollection){
return new ReIndexStatus( "", Status.STARTED, 0, 0, reIndexRequestBuilder.getCollectionName().get() );

}


return new ReIndexStatus( jobId, Status.STARTED, 0, 0 );
return new ReIndexStatus( jobId, Status.STARTED, 0, 0, "" );
}


Expand All @@ -196,38 +223,15 @@ public ReIndexStatus getStatus( final String jobId ) {
return getIndexResponse( jobId );
}


/**
* Simple collector that counts state, then flushed every time a buffer is provided. Writes final state when complete
*/
private class FlushingCollector {

private final String jobId;
private long count;


private FlushingCollector( final String jobId ) {
this.jobId = jobId;
}


public void flushBuffer( final List<EdgeScope> buffer ) {
count += buffer.size();

//write our cursor state
if ( buffer.size() > 0 ) {
writeCursorState( jobId, buffer.get( buffer.size() - 1 ) );
}

writeStateMeta( jobId, Status.INPROGRESS, count, System.currentTimeMillis() );
}

public void complete(){
writeStateMeta( jobId, Status.COMPLETE, count, System.currentTimeMillis() );
}
@Override
public ReIndexStatus getStatusForCollection( final String appIdString, final String collectionName ) {
Preconditions.checkNotNull( collectionName, "appIdString must not be null" );
Preconditions.checkNotNull( collectionName, "collectionName must not be null" );
return getIndexResponseForCollection( appIdString, collectionName );
}



/**
* Get the resume edge scope
*
Expand Down Expand Up @@ -346,15 +350,47 @@ private ReIndexStatus getIndexResponse( final String jobId ) {
final String stringStatus = mapManager.getString( jobId+MAP_STATUS_KEY );

if(stringStatus == null){
return new ReIndexStatus( jobId, Status.UNKNOWN, 0, 0 );
return new ReIndexStatus( jobId, Status.UNKNOWN, 0, 0, "" );
}

final Status status = Status.valueOf( stringStatus );

final long processedCount = mapManager.getLong( jobId + MAP_COUNT_KEY );
final long lastUpdated = mapManager.getLong( jobId + MAP_UPDATED_KEY );

return new ReIndexStatus( jobId, status, processedCount, lastUpdated );
return new ReIndexStatus( jobId, status, processedCount, lastUpdated, "" );
}


private void writeStateMetaForCollection(final String appIdString, final String collectionName,
final Status status, final long processedCount, final long lastUpdated ) {

if(logger.isDebugEnabled()) {
logger.debug( "Flushing state for collection {}, status {}, processedCount {}, lastUpdated {}",
collectionName, status, processedCount, lastUpdated);
}

mapManager.putString( appIdString + MAP_SEPARATOR + collectionName + MAP_STATUS_KEY, status.name() );
mapManager.putLong( appIdString + MAP_SEPARATOR + collectionName + MAP_COUNT_KEY, processedCount );
mapManager.putLong( appIdString + MAP_SEPARATOR + collectionName + MAP_UPDATED_KEY, lastUpdated );
}


private ReIndexStatus getIndexResponseForCollection( final String appIdString, final String collectionName ) {

final String stringStatus =
mapManager.getString( appIdString + MAP_SEPARATOR + collectionName + MAP_STATUS_KEY );

if(stringStatus == null){
return new ReIndexStatus( "", Status.UNKNOWN, 0, 0, collectionName );
}

final Status status = Status.valueOf( stringStatus );

final long processedCount = mapManager.getLong( appIdString + MAP_SEPARATOR + collectionName + MAP_COUNT_KEY );
final long lastUpdated = mapManager.getLong( appIdString + MAP_SEPARATOR + collectionName + MAP_UPDATED_KEY );

return new ReIndexStatus( "", status, processedCount, lastUpdated, collectionName );
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,26 +251,41 @@ public ApiResponse clearCollectionJobGet(
}


// TODO: this can't be controlled and until it can be controlled we shouldn' allow muggles to do this.
// So system access only.
// TODO: use scheduler here to get around people sending a reindex call 30 times.

@POST
@Path("{itemName}/_reindex")
@Produces({ MediaType.APPLICATION_JSON,"application/javascript"})
@RequireSystemAccess
@RequireApplicationAccess
@JSONP
public ApiResponse executePostForReindexing(
@Context UriInfo ui, String body,
@Context UriInfo ui, final Map<String, Object> payload,
@PathParam("itemName") PathSegment itemName,
@QueryParam("callback") @DefaultValue("callback") String callback ) throws Exception {

addItemToServiceContext( ui, itemName );

IndexResource indexResource = new IndexResource(injector);
return indexResource.rebuildIndexesPost(
return indexResource.rebuildIndexCollectionPost(payload,
services.getApplicationId().toString(),itemName.getPath(),false,callback );
}

@GET
@Path("{itemName}/_reindex")
@Produces({ MediaType.APPLICATION_JSON,"application/javascript"})
@RequireApplicationAccess
@JSONP
public ApiResponse executeGetForReindexStatus(
@Context UriInfo ui, final Map<String, Object> payload,
@PathParam("itemName") PathSegment itemName,
@QueryParam("callback") @DefaultValue("callback") String callback ) throws Exception {

addItemToServiceContext( ui, itemName );

IndexResource indexResource = new IndexResource(injector);
return indexResource.rebuildIndexCollectionGet(services.getApplicationId().toString(), itemName.getPath(),
callback );
}


private CollectionDeleteService getCollectionDeleteService() {
return injector.getInstance( CollectionDeleteService.class );
Expand Down Expand Up @@ -310,18 +325,17 @@ private ApiResponse executeResumeAndCreateResponse( final Map<String, Object> pa
private ApiResponse executeAndCreateResponse(final CollectionDeleteRequestBuilder request, final String callback ) {


final CollectionDeleteService.CollectionDeleteStatus status = getCollectionDeleteService().deleteCollection( request );
final CollectionDeleteService.CollectionDeleteStatus status = getCollectionDeleteService().deleteCollection(request);

final ApiResponse response = createApiResponse();

response.setAction( "clear collection" );
response.setProperty( "jobId", status.getJobId() );
response.setProperty( "status", status.getStatus() );
response.setProperty( "lastUpdatedEpoch", status.getLastUpdated() );
response.setProperty( "numberQueued", status.getNumberProcessed() );
response.setAction("clear collection");
response.setProperty("jobId", status.getJobId());
response.setProperty("status", status.getStatus());
response.setProperty("lastUpdatedEpoch", status.getLastUpdated());
response.setProperty("numberQueued", status.getNumberProcessed());
response.setSuccess();

return response;
}

}
Loading

0 comments on commit 1ca08f3

Please sign in to comment.