Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Expose collection re-index to non sysadmin users. #576

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package org.apache.usergrid.corepersistence.index;


import org.apache.usergrid.utils.StringUtils;

/**
* An interface for re-indexing all entities in an application
*/
Expand Down Expand Up @@ -47,6 +49,13 @@ public interface ReIndexService {
*/
ReIndexStatus getStatus( final String jobId );

/**
* Get the status of a collection job
* @param collectionName The collectionName for the rebuild index
* @return
*/
ReIndexStatus getStatusForCollection( final String appIdString, final String collectionName );


/**
* The response when requesting a re-index operation
Expand All @@ -56,14 +65,27 @@ public class ReIndexStatus {
final Status status;
final long numberProcessed;
final long lastUpdated;
final String collectionName;


public ReIndexStatus( final String jobId, final Status status, final long numberProcessed,
final long lastUpdated ) {
this.jobId = jobId;
final long lastUpdated, final String collectionName ) {

if(StringUtils.isNotEmpty(jobId)){
this.jobId = jobId;
}else {
this.jobId = "";
}

this.status = status;
this.numberProcessed = numberProcessed;
this.lastUpdated = lastUpdated;

if(StringUtils.isNotEmpty(collectionName)){
this.collectionName = collectionName;
}else {
this.collectionName = "";
}
}


Expand All @@ -74,6 +96,13 @@ public String getJobId() {
return jobId;
}

/**
* Get the jobId used to resume this operation
*/
public String getCollectionName() {
return collectionName;
}


/**
* Get the last updated time, as a long
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public class ReIndexServiceImpl implements ReIndexService {
private static final String MAP_COUNT_KEY = "count";
private static final String MAP_STATUS_KEY = "status";
private static final String MAP_UPDATED_KEY = "lastUpdated";
private static final String MAP_SEPARATOR = "|||";


private final AllApplicationsObservable allApplicationsObservable;
Expand Down Expand Up @@ -140,7 +141,9 @@ public ReIndexStatus rebuildIndex( final ReIndexRequestBuilder reIndexRequestBui

// create an observable that loads a batch to be indexed

if(reIndexRequestBuilder.getCollectionName().isPresent()) {
final boolean isForCollection = reIndexRequestBuilder.getCollectionName().isPresent();

if(isForCollection) {

String collectionName = InflectionUtils.pluralize(
CpNamingUtils.getNameFromEdgeType(reIndexRequestBuilder.getCollectionName().get() ));
Expand Down Expand Up @@ -175,12 +178,36 @@ public ReIndexStatus rebuildIndex( final ReIndexRequestBuilder reIndexRequestBui
if( edgeScopes.size() > 0 ) {
writeCursorState(jobId, edgeScopes.get(edgeScopes.size() - 1));
}
writeStateMeta( jobId, Status.INPROGRESS, count.get(), System.currentTimeMillis() ); })
.doOnCompleted(() -> writeStateMeta( jobId, Status.COMPLETE, count.get(), System.currentTimeMillis() ))
if( isForCollection ){
writeStateMetaForCollection(
appId.get().getApplication().getUuid().toString(),
reIndexRequestBuilder.getCollectionName().get(),
Status.INPROGRESS, count.get(),
System.currentTimeMillis() );
}else{
writeStateMeta( jobId, Status.INPROGRESS, count.get(), System.currentTimeMillis() );
}
})
.doOnCompleted(() ->{
if( isForCollection ){
writeStateMetaForCollection(
appId.get().getApplication().getUuid().toString(),
reIndexRequestBuilder.getCollectionName().get(),
Status.COMPLETE, count.get(),
System.currentTimeMillis() );
}else {
writeStateMeta(jobId, Status.COMPLETE, count.get(), System.currentTimeMillis());
}
})
.subscribeOn( Schedulers.io() ).subscribe();

if(isForCollection){
return new ReIndexStatus( "", Status.STARTED, 0, 0, reIndexRequestBuilder.getCollectionName().get() );

}


return new ReIndexStatus( jobId, Status.STARTED, 0, 0 );
return new ReIndexStatus( jobId, Status.STARTED, 0, 0, "" );
}


Expand All @@ -196,38 +223,15 @@ public ReIndexStatus getStatus( final String jobId ) {
return getIndexResponse( jobId );
}


/**
* Simple collector that counts state, then flushed every time a buffer is provided. Writes final state when complete
*/
private class FlushingCollector {

private final String jobId;
private long count;


private FlushingCollector( final String jobId ) {
this.jobId = jobId;
}


public void flushBuffer( final List<EdgeScope> buffer ) {
count += buffer.size();

//write our cursor state
if ( buffer.size() > 0 ) {
writeCursorState( jobId, buffer.get( buffer.size() - 1 ) );
}

writeStateMeta( jobId, Status.INPROGRESS, count, System.currentTimeMillis() );
}

public void complete(){
writeStateMeta( jobId, Status.COMPLETE, count, System.currentTimeMillis() );
}
@Override
public ReIndexStatus getStatusForCollection( final String appIdString, final String collectionName ) {
Preconditions.checkNotNull( collectionName, "appIdString must not be null" );
Preconditions.checkNotNull( collectionName, "collectionName must not be null" );
return getIndexResponseForCollection( appIdString, collectionName );
}



/**
* Get the resume edge scope
*
Expand Down Expand Up @@ -346,15 +350,47 @@ private ReIndexStatus getIndexResponse( final String jobId ) {
final String stringStatus = mapManager.getString( jobId+MAP_STATUS_KEY );

if(stringStatus == null){
return new ReIndexStatus( jobId, Status.UNKNOWN, 0, 0 );
return new ReIndexStatus( jobId, Status.UNKNOWN, 0, 0, "" );
}

final Status status = Status.valueOf( stringStatus );

final long processedCount = mapManager.getLong( jobId + MAP_COUNT_KEY );
final long lastUpdated = mapManager.getLong( jobId + MAP_UPDATED_KEY );

return new ReIndexStatus( jobId, status, processedCount, lastUpdated );
return new ReIndexStatus( jobId, status, processedCount, lastUpdated, "" );
}


private void writeStateMetaForCollection(final String appIdString, final String collectionName,
final Status status, final long processedCount, final long lastUpdated ) {

if(logger.isDebugEnabled()) {
logger.debug( "Flushing state for collection {}, status {}, processedCount {}, lastUpdated {}",
collectionName, status, processedCount, lastUpdated);
}

mapManager.putString( appIdString + MAP_SEPARATOR + collectionName + MAP_STATUS_KEY, status.name() );
mapManager.putLong( appIdString + MAP_SEPARATOR + collectionName + MAP_COUNT_KEY, processedCount );
mapManager.putLong( appIdString + MAP_SEPARATOR + collectionName + MAP_UPDATED_KEY, lastUpdated );
}


private ReIndexStatus getIndexResponseForCollection( final String appIdString, final String collectionName ) {

final String stringStatus =
mapManager.getString( appIdString + MAP_SEPARATOR + collectionName + MAP_STATUS_KEY );

if(stringStatus == null){
return new ReIndexStatus( "", Status.UNKNOWN, 0, 0, collectionName );
}

final Status status = Status.valueOf( stringStatus );

final long processedCount = mapManager.getLong( appIdString + MAP_SEPARATOR + collectionName + MAP_COUNT_KEY );
final long lastUpdated = mapManager.getLong( appIdString + MAP_SEPARATOR + collectionName + MAP_UPDATED_KEY );

return new ReIndexStatus( "", status, processedCount, lastUpdated, collectionName );
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,26 +251,41 @@ public ApiResponse clearCollectionJobGet(
}


// TODO: this can't be controlled and until it can be controlled we shouldn' allow muggles to do this.
// So system access only.
// TODO: use scheduler here to get around people sending a reindex call 30 times.

@POST
@Path("{itemName}/_reindex")
@Produces({ MediaType.APPLICATION_JSON,"application/javascript"})
@RequireSystemAccess
@RequireApplicationAccess
@JSONP
public ApiResponse executePostForReindexing(
@Context UriInfo ui, String body,
@Context UriInfo ui, final Map<String, Object> payload,
@PathParam("itemName") PathSegment itemName,
@QueryParam("callback") @DefaultValue("callback") String callback ) throws Exception {

addItemToServiceContext( ui, itemName );

IndexResource indexResource = new IndexResource(injector);
return indexResource.rebuildIndexesPost(
return indexResource.rebuildIndexCollectionPost(payload,
services.getApplicationId().toString(),itemName.getPath(),false,callback );
}

@GET
@Path("{itemName}/_reindex")
@Produces({ MediaType.APPLICATION_JSON,"application/javascript"})
@RequireApplicationAccess
@JSONP
public ApiResponse executeGetForReindexStatus(
@Context UriInfo ui, final Map<String, Object> payload,
@PathParam("itemName") PathSegment itemName,
@QueryParam("callback") @DefaultValue("callback") String callback ) throws Exception {

addItemToServiceContext( ui, itemName );

IndexResource indexResource = new IndexResource(injector);
return indexResource.rebuildIndexCollectionGet(services.getApplicationId().toString(), itemName.getPath(),
callback );
}


private CollectionDeleteService getCollectionDeleteService() {
return injector.getInstance( CollectionDeleteService.class );
Expand Down Expand Up @@ -310,18 +325,17 @@ private ApiResponse executeResumeAndCreateResponse( final Map<String, Object> pa
private ApiResponse executeAndCreateResponse(final CollectionDeleteRequestBuilder request, final String callback ) {


final CollectionDeleteService.CollectionDeleteStatus status = getCollectionDeleteService().deleteCollection( request );
final CollectionDeleteService.CollectionDeleteStatus status = getCollectionDeleteService().deleteCollection(request);

final ApiResponse response = createApiResponse();

response.setAction( "clear collection" );
response.setProperty( "jobId", status.getJobId() );
response.setProperty( "status", status.getStatus() );
response.setProperty( "lastUpdatedEpoch", status.getLastUpdated() );
response.setProperty( "numberQueued", status.getNumberProcessed() );
response.setAction("clear collection");
response.setProperty("jobId", status.getJobId());
response.setProperty("status", status.getStatus());
response.setProperty("lastUpdatedEpoch", status.getLastUpdated());
response.setProperty("numberQueued", status.getNumberProcessed());
response.setSuccess();

return response;
}

}
Loading