Skip to content

Commit

Permalink
Merge pull request #3577 from ingef/reintegrate-main
Browse files Browse the repository at this point in the history
Reintegrate Main
  • Loading branch information
awildturtok authored Sep 30, 2024
2 parents 3aa7b83 + 99446aa commit 4a10e08
Show file tree
Hide file tree
Showing 51 changed files with 610 additions and 425 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package com.bakdata.conquery.apiv1;

import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.text.NumberFormat;
import java.time.LocalDate;
import java.util.ArrayList;
Expand All @@ -21,12 +19,6 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import jakarta.inject.Inject;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.validation.Validator;
import jakarta.ws.rs.BadRequestException;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.UriBuilder;

import com.bakdata.conquery.apiv1.execution.ExecutionStatus;
import com.bakdata.conquery.apiv1.execution.FullExecutionStatus;
Expand Down Expand Up @@ -89,6 +81,12 @@
import com.bakdata.conquery.util.QueryUtils;
import com.bakdata.conquery.util.QueryUtils.NamespacedIdentifiableCollector;
import com.bakdata.conquery.util.io.IdColumnUtil;
import jakarta.inject.Inject;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.validation.Validator;
import jakarta.ws.rs.BadRequestException;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.UriBuilder;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -123,11 +121,15 @@ public Stream<ExecutionStatus> getQueriesFiltered(DatasetId datasetId, UriBuilde
// to exclude subtypes from somewhere else
.filter(QueryProcessor::canFrontendRender)
.filter(Predicate.not(ManagedExecution::isSystem))
.filter(q -> q.getState().equals(ExecutionState.DONE) || q.getState().equals(ExecutionState.NEW))
.filter(q -> {
ExecutionState state = q.getState();
return state == ExecutionState.NEW || state == ExecutionState.DONE;
}
)
.filter(q -> subject.isPermitted(q, Ability.READ))
.map(mq -> {
Namespace namespace = datasetRegistry.get(mq.getDataset().getId());
final OverviewExecutionStatus status = mq.buildStatusOverview(uriBuilder.clone(), subject, namespace);
final OverviewExecutionStatus status = mq.buildStatusOverview(uriBuilder.clone(), subject);

if (mq.isReadyToDownload()) {
status.setResultUrls(getResultAssets(config.getResultProviders(), mq, uriBuilder, allProviders));
}
Expand Down Expand Up @@ -170,12 +172,12 @@ private static boolean canFrontendRender(ManagedExecution q) {
public static List<ResultAsset> getResultAssets(List<ResultRendererProvider> renderer, ManagedExecution exec, UriBuilder uriBuilder, boolean allProviders) {

return renderer.stream()
.map(r -> {
.map(rendererProvider -> {
try {
return r.generateResultURLs(exec, uriBuilder.clone(), allProviders);
return rendererProvider.generateResultURLs(exec, uriBuilder.clone(), allProviders);
}
catch (MalformedURLException | URISyntaxException e) {
log.error("Cannot generate result urls for execution '{}' with provider '{}'", exec.getId(), r.getClass().getName());
catch (Exception e) {
log.error("Cannot generate result urls for execution '{}' with provider {}", exec.getId(), rendererProvider, e);
return null;
}
})
Expand All @@ -200,13 +202,13 @@ public static boolean isFrontendStructure(CQElement root) {
public void cancel(Subject subject, Dataset dataset, ManagedExecution query) {

// Does not make sense to cancel a query that isn't running.
ExecutionManager executionManager = datasetRegistry.get(dataset.getId()).getExecutionManager();
if (!query.getState().equals(ExecutionState.RUNNING)) {
return;
}

log.info("User[{}] cancelled Query[{}]", subject.getId(), query.getId());

final ExecutionManager executionManager = datasetRegistry.get(dataset.getId()).getExecutionManager();
executionManager.cancelQuery(query);
}

Expand Down Expand Up @@ -258,6 +260,7 @@ public void patchQuery(Subject subject, ManagedExecution execution, MetaDataPatc
public void reexecute(Subject subject, ManagedExecution query) {
log.info("User[{}] reexecuted Query[{}]", subject.getId(), query);

ExecutionManager executionManager = datasetRegistry.get(query.getDataset().getId()).getExecutionManager();
if (!query.getState().equals(ExecutionState.RUNNING)) {
final Namespace namespace = query.getNamespace();

Expand All @@ -283,7 +286,7 @@ public ExecutionState awaitDone(ManagedExecution query, int time, TimeUnit unit)
public FullExecutionStatus getQueryFullStatus(ManagedExecution query, Subject subject, UriBuilder url, Boolean allProviders) {
final Namespace namespace = datasetRegistry.get(query.getDataset().getId());

query.initExecutable(namespace, config);
query.initExecutable(config);

final FullExecutionStatus status = query.buildStatusFull(subject, namespace);

Expand Down Expand Up @@ -330,7 +333,7 @@ public ExternalUploadResult uploadEntities(Subject subject, Dataset dataset, Ext
execution.setLabel(upload.getLabel());
}

execution.initExecutable(namespace, config);
execution.initExecutable(config);

return new ExternalUploadResult(execution.getId(), statistic.getResolved().size(), statistic.getUnresolvedId(), statistic.getUnreadableDate());
}
Expand All @@ -356,7 +359,8 @@ public FullExecutionStatus getSingleEntityExport(Subject subject, UriBuilder uri
final EntityPreviewExecution execution = (EntityPreviewExecution) postQuery(dataset, form, subject, true);


if (namespace.getExecutionManager().awaitDone(execution, 10, TimeUnit.SECONDS) == ExecutionState.RUNNING) {
ExecutionManager executionManager = namespace.getExecutionManager();
if (executionManager.awaitDone(execution, 10, TimeUnit.SECONDS) == ExecutionState.RUNNING) {
log.warn("Still waiting for {} after 10 Seconds.", execution.getId());
throw new ConqueryError.ExecutionProcessingTimeoutError();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@
import com.bakdata.conquery.models.identifiable.ids.specific.SecondaryIdDescriptionId;
import com.bakdata.conquery.models.identifiable.ids.specific.UserId;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.FieldNameConstants;

@NoArgsConstructor
@AllArgsConstructor
@ToString
@Data
@FieldNameConstants
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package com.bakdata.conquery.apiv1.execution;

import lombok.Builder;
import lombok.NoArgsConstructor;

/**
* Light weight description of an execution. Rendering the overview should not cause heavy computations.
*/
@NoArgsConstructor
@Builder
public class OverviewExecutionStatus extends ExecutionStatus {

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.bakdata.conquery.models.identifiable.ids.specific.ManagedExecutionId;
import com.bakdata.conquery.models.query.QueryResolveContext;
import com.bakdata.conquery.models.query.Visitable;
import com.bakdata.conquery.models.worker.DatasetRegistry;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonValue;
import com.fasterxml.jackson.core.JsonParser;
Expand Down Expand Up @@ -132,8 +133,8 @@ public String getFormType() {
}

@Override
public ManagedExecution toManagedExecution(User user, Dataset submittedDataset, MetaStorage storage) {
return new ExternalExecution(this, user, submittedDataset, storage);
public ManagedExecution toManagedExecution(User user, Dataset submittedDataset, MetaStorage storage, DatasetRegistry<?> datasetRegistry) {
return new ExternalExecution(this, user, submittedDataset, storage, datasetRegistry);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
import com.bakdata.conquery.models.auth.permissions.Ability;
import com.bakdata.conquery.models.datasets.Dataset;
import com.bakdata.conquery.models.error.ConqueryError;
import com.bakdata.conquery.models.execution.ManagedExecution;
import com.bakdata.conquery.models.forms.frontendconfiguration.FormScanner;
import com.bakdata.conquery.models.forms.frontendconfiguration.FormType;
import com.bakdata.conquery.models.forms.managed.ManagedForm;
import com.bakdata.conquery.models.query.visitor.QueryVisitor;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.databind.JsonNode;
Expand All @@ -28,7 +28,7 @@ public abstract class Form implements QueryDescription {

/**
* Raw form config (basically the raw format of this form), that is used by the backend at the moment to
* create a {@link com.bakdata.conquery.models.forms.configs.FormConfig} upon start of this form (see {@link ManagedForm#start()}).
* create a {@link com.bakdata.conquery.models.forms.configs.FormConfig} upon start of this form (see {@link ManagedExecution#start()}).
*/
@Nullable
public abstract JsonNode getValues();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import javax.annotation.Nullable;
import jakarta.validation.Valid;
import jakarta.validation.ValidationException;
Expand Down Expand Up @@ -37,6 +36,7 @@
import com.bakdata.conquery.models.query.ManagedQuery;
import com.bakdata.conquery.models.query.QueryResolveContext;
import com.bakdata.conquery.models.query.Visitable;
import com.bakdata.conquery.models.worker.DatasetRegistry;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonManagedReference;
Expand Down Expand Up @@ -215,7 +215,7 @@ public static void enable(List<CQElement> features) {


@Override
public ManagedForm toManagedExecution(User user, Dataset submittedDataset, MetaStorage storage) {
return new ManagedInternalForm(this, user, submittedDataset, storage);
public ManagedForm<?> toManagedExecution(User user, Dataset submittedDataset, MetaStorage storage, DatasetRegistry<?> datasetRegistry) {
return new ManagedInternalForm<>(this, user, submittedDataset, storage, datasetRegistry);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;

import javax.annotation.Nullable;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotEmpty;
Expand All @@ -33,6 +32,7 @@
import com.bakdata.conquery.models.query.ManagedQuery;
import com.bakdata.conquery.models.query.QueryResolveContext;
import com.bakdata.conquery.models.query.Visitable;
import com.bakdata.conquery.models.worker.DatasetRegistry;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.JsonNode;
Expand Down Expand Up @@ -126,7 +126,7 @@ public String getLocalizedTypeLabel() {


@Override
public ManagedInternalForm<FullExportForm> toManagedExecution(User user, Dataset submittedDataset, MetaStorage storage) {
return new ManagedInternalForm<>(this, user, submittedDataset, storage);
public ManagedInternalForm<FullExportForm> toManagedExecution(User user, Dataset submittedDataset, MetaStorage storage, DatasetRegistry<?> datasetRegistry) {
return new ManagedInternalForm<>(this, user, submittedDataset, storage, datasetRegistry);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.bakdata.conquery.models.query.queryplan.QueryPlan;
import com.bakdata.conquery.models.query.resultinfo.ResultInfo;
import com.bakdata.conquery.models.query.results.EntityResult;
import com.bakdata.conquery.models.worker.DatasetRegistry;
import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.EqualsAndHashCode;

Expand All @@ -41,8 +42,8 @@ public Set<ManagedExecutionId> collectRequiredQueries() {
public abstract List<ResultInfo> getResultInfos(PrintSettings printSettings);

@Override
public ManagedQuery toManagedExecution(User user, Dataset submittedDataset, MetaStorage storage) {
return new ManagedQuery(this, user, submittedDataset, storage);
public ManagedQuery toManagedExecution(User user, Dataset submittedDataset, MetaStorage storage, DatasetRegistry<?> datasetRegistry) {
return new ManagedQuery(this, user, submittedDataset, storage, datasetRegistry);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.bakdata.conquery.models.query.RequiredEntities;
import com.bakdata.conquery.models.query.Visitable;
import com.bakdata.conquery.models.query.visitor.QueryVisitor;
import com.bakdata.conquery.models.worker.DatasetRegistry;
import com.bakdata.conquery.util.QueryUtils;
import com.bakdata.conquery.util.QueryUtils.ExternalIdChecker;
import com.bakdata.conquery.util.QueryUtils.NamespacedIdentifiableCollector;
Expand All @@ -44,7 +45,7 @@ public interface QueryDescription extends Visitable {
* @param storage
* @return
*/
ManagedExecution toManagedExecution(User user, Dataset submittedDataset, MetaStorage storage);
ManagedExecution toManagedExecution(User user, Dataset submittedDataset, MetaStorage storage, DatasetRegistry<?> datasetRegistry);


Set<ManagedExecutionId> collectRequiredQueries();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public class ClusterNamespaceHandler implements NamespaceHandler<DistributedName
@Override
public DistributedNamespace createNamespace(NamespaceStorage namespaceStorage, MetaStorage metaStorage, DatasetRegistry<DistributedNamespace> datasetRegistry, Environment environment) {
NamespaceSetupData namespaceData = NamespaceHandler.createNamespaceSetup(namespaceStorage, config, internalMapperFactory, datasetRegistry);
DistributedExecutionManager executionManager = new DistributedExecutionManager(metaStorage, clusterState);
DistributedExecutionManager executionManager = new DistributedExecutionManager(metaStorage, datasetRegistry, clusterState);
WorkerHandler workerHandler = new WorkerHandler(namespaceData.getCommunicationMapper(), namespaceStorage);
clusterState.getWorkerHandlers().put(namespaceStorage.getDataset().getId(), workerHandler);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public LocalNamespace createNamespace(NamespaceStorage namespaceStorage, MetaSto
SqlExecutionService sqlExecutionService = new SqlExecutionService(dslContext, resultSetProcessor);
NodeConversions nodeConversions = new NodeConversions(idColumns, sqlDialect, dslContext, databaseConfig, sqlExecutionService);
SqlConverter sqlConverter = new SqlConverter(nodeConversions, config);
ExecutionManager executionManager = new SqlExecutionManager(sqlConverter, sqlExecutionService, metaStorage);
ExecutionManager executionManager = new SqlExecutionManager(sqlConverter, sqlExecutionService, metaStorage, datasetRegistry);
SqlStorageHandler sqlStorageHandler = new SqlStorageHandler(sqlExecutionService);
SqlEntityResolver sqlEntityResolver = new SqlEntityResolver(idColumns, dslContext, sqlDialect, sqlExecutionService);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,12 @@
import com.bakdata.conquery.resources.api.ResultCsvResource;
import io.dropwizard.jersey.DropwizardResourceConfig;
import jakarta.ws.rs.core.UriBuilder;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.glassfish.jersey.internal.inject.AbstractBinder;

@Slf4j
@NoArgsConstructor
@AllArgsConstructor
@Data
@CPSType(base = ResultRendererProvider.class, id = "CSV")
public class CsvResultProvider implements ResultRendererProvider {
private boolean hidden = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import java.util.Collection;
import java.util.Collections;
import jakarta.ws.rs.core.UriBuilder;

import com.bakdata.conquery.apiv1.execution.ResultAsset;
import com.bakdata.conquery.commands.ManagerNode;
Expand All @@ -13,11 +12,14 @@
import com.bakdata.conquery.models.forms.managed.ExternalExecution;
import com.bakdata.conquery.resources.api.ResultExternalResource;
import io.dropwizard.jersey.DropwizardResourceConfig;
import jakarta.ws.rs.core.UriBuilder;
import lombok.Data;
import lombok.Getter;
import lombok.Setter;
import org.glassfish.hk2.utilities.binding.AbstractBinder;

@Getter
@Data
@CPSType(base = ResultRendererProvider.class, id = "EXTERNAL")
public class ExternalResultProvider implements ResultRendererProvider {

Expand Down
Loading

0 comments on commit 4a10e08

Please sign in to comment.