Skip to content

Commit

Permalink
Eng 657 frontend allow users to view what tables (#55)
Browse files Browse the repository at this point in the history
  • Loading branch information
eunice-chan authored Jun 13, 2022
1 parent 7b42ae1 commit e6bbc9b
Show file tree
Hide file tree
Showing 35 changed files with 1,519 additions and 93 deletions.
9 changes: 7 additions & 2 deletions scripts/install_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,13 @@ def execute_command(args, cwd=None):

if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('--update-ui', dest="update_ui", default=False, action='store_true',
help="Whether to build and replace UI files.")
parser.add_argument(
"--update-ui",
dest="update_ui",
default=False,
action="store_true",
help="Whether to build and replace UI files.",
)
args = parser.parse_args()
print("Current directory should be the root directory of the aqueduct repo.")
cwd = os.getcwd()
Expand Down
189 changes: 189 additions & 0 deletions src/golang/cmd/server/handler/create_table.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
package handler

import (
"context"
"fmt"
"io"
"net/http"
"time"

"github.com/aqueducthq/aqueduct/cmd/server/routes"
"github.com/aqueducthq/aqueduct/lib/collections/integration"
"github.com/aqueducthq/aqueduct/lib/collections/operator_result"
"github.com/aqueducthq/aqueduct/lib/collections/shared"
aq_context "github.com/aqueducthq/aqueduct/lib/context"
"github.com/aqueducthq/aqueduct/lib/database"
"github.com/aqueducthq/aqueduct/lib/job"
"github.com/aqueducthq/aqueduct/lib/storage"
"github.com/aqueducthq/aqueduct/lib/vault"
"github.com/aqueducthq/aqueduct/lib/workflow/operator/connector"
"github.com/aqueducthq/aqueduct/lib/workflow/operator/connector/auth"
"github.com/aqueducthq/aqueduct/lib/workflow/utils"
"github.com/dropbox/godropbox/errors"
"github.com/go-chi/chi"
"github.com/google/uuid"
)

// Route: /integration/{integrationId}/create
// Method: POST
// Params:
// `integrationId`: ID for `integration` object
// ** ONLY SUPPORTS CREATING TABLES FOR THE DEMO DB **
// Request:
// Headers:
// `table-name`: name of table to create
// `api-key`: user's API Key
// Body:
// the CSV file to upload to the integration.

const (
pollCreateInterval = 500 * time.Millisecond
pollCreateTimeout = 2 * time.Minute
)

// Creates a table in the specified integration.
type CreateTableHandler struct {
PostHandler

Database database.Database
IntegrationReader integration.Reader
StorageConfig *shared.StorageConfig
JobManager job.JobManager
Vault vault.Vault
}

type CreateTableArgs struct {
*aq_context.AqContext
tableName string
integrationId uuid.UUID
csv []byte
}

type CreateTableResponse struct{}

func (*CreateTableHandler) Name() string {
return "CreateTable"
}

func (h *CreateTableHandler) Prepare(r *http.Request) (interface{}, int, error) {
aqContext, statusCode, err := aq_context.ParseAqContext(r.Context())
if err != nil {
return nil, statusCode, errors.Wrap(err, "Unable to parse arguments.")
}
tableName := r.Header.Get(routes.TableNameHeader)
integrationIdStr := chi.URLParam(r, routes.IntegrationIdUrlParam)
integrationId, err := uuid.Parse(integrationIdStr)
if err != nil {
return nil, http.StatusBadRequest, errors.Wrap(err, "Malformed integration ID.")
}

csv, err := io.ReadAll(r.Body)
if err != nil {
return nil, http.StatusBadRequest, errors.Wrap(err, "Unable to read CSV content.")
}

return &CreateTableArgs{
AqContext: aqContext,
tableName: tableName,
integrationId: integrationId,
csv: csv,
}, http.StatusOK, nil
}

func (h *CreateTableHandler) Perform(ctx context.Context, interfaceArgs interface{}) (interface{}, int, error) {
args := interfaceArgs.(*CreateTableArgs)

integrationObject, err := h.IntegrationReader.GetIntegration(
ctx,
args.integrationId,
h.Database,
)
if err != nil {
return nil, http.StatusBadRequest, errors.Wrap(err, "Cannot get integration.")
}

// Save CSV
contentPath := fmt.Sprintf("create-table-content-%s", args.RequestId)
csvStorage := storage.NewStorage(h.StorageConfig)
if err := csvStorage.Put(ctx, contentPath, args.csv); err != nil {
return nil, http.StatusInternalServerError, errors.Wrap(err, "Cannot save CSV.")
}

var returnErr error = nil
returnStatus := http.StatusOK

defer func() {
if deleteErr := csvStorage.Delete(ctx, contentPath); deleteErr != nil {
returnErr = errors.Wrap(deleteErr, "Error deleting CSV from temporary storage.")
returnStatus = http.StatusInternalServerError
}
}()

emptyResp := CreateTableResponse{}

if statusCode, err := CreateTable(ctx, args, contentPath, integrationObject, h.Vault, h.StorageConfig, h.JobManager); err != nil {
return emptyResp, statusCode, err
}

return emptyResp, returnStatus, returnErr
}

// CreateTable adds the CSV as a table in the database. It returns a status code for the request
// and an error, if any.
func CreateTable(ctx context.Context, args *CreateTableArgs, contentPath string, integrationObject *integration.Integration, vaultObject vault.Vault, storageConfig *shared.StorageConfig, jobManager job.JobManager) (int, error) {
// Schedule load table job
jobMetadataPath := fmt.Sprintf("create-table-%s", args.RequestId)

jobName := fmt.Sprintf("create-table-operator-%s", uuid.New().String())

config, err := auth.ReadConfigFromSecret(ctx, integrationObject.Id, vaultObject)
if err != nil {
return http.StatusInternalServerError, errors.Wrap(err, "Unable to launch create table job.")
}

// Assuming service supports GenericRelationalDBLoadParams
loadParameters := &connector.GenericRelationalDBLoadParams{
RelationalDBLoadParams: connector.RelationalDBLoadParams{
Table: args.tableName,
UpdateMode: "fail",
},
}

jobSpec := job.NewLoadTableSpec(
jobName,
contentPath,
storageConfig,
jobMetadataPath,
integrationObject.Service,
config,
loadParameters,
"",
"",
)
if err := jobManager.Launch(ctx, jobName, jobSpec); err != nil {
return http.StatusInternalServerError, errors.Wrap(err, "Unable to launch create table job.")
}

jobStatus, err := job.PollJob(ctx, jobName, jobManager, pollCreateInterval, pollCreateTimeout)
if err != nil {
return http.StatusInternalServerError, errors.Wrap(err, "Unable to create table.")
}

if jobStatus == shared.SucceededExecutionStatus {
// Table creation was successful
return http.StatusOK, nil
}

// Table creation failed, so we need to fetch the error message from storage
var metadata operator_result.Metadata
if err := utils.ReadFromStorage(
ctx,
storageConfig,
jobMetadataPath,
&metadata,
); err != nil {
return http.StatusInternalServerError, errors.Wrap(err, "Unable to create table.")
}

return http.StatusBadRequest, errors.Newf("Unable to create table: %v", metadata.Error)
}
33 changes: 32 additions & 1 deletion src/golang/cmd/server/handler/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net/http"
"time"

"github.com/aqueducthq/aqueduct/cmd/server/queries"
"github.com/aqueducthq/aqueduct/cmd/server/routes"
"github.com/aqueducthq/aqueduct/lib/collections/integration"
"github.com/aqueducthq/aqueduct/lib/collections/operator_result"
Expand All @@ -14,6 +15,7 @@ import (
"github.com/aqueducthq/aqueduct/lib/database"
"github.com/aqueducthq/aqueduct/lib/job"
"github.com/aqueducthq/aqueduct/lib/vault"
"github.com/aqueducthq/aqueduct/lib/workflow/operator/connector"
"github.com/aqueducthq/aqueduct/lib/workflow/operator/connector/auth"
workflow_utils "github.com/aqueducthq/aqueduct/lib/workflow/utils"
"github.com/dropbox/godropbox/errors"
Expand Down Expand Up @@ -51,6 +53,7 @@ type DiscoverHandler struct {

Database database.Database
IntegrationReader integration.Reader
CustomReader queries.Reader
StorageConfig *shared.StorageConfig
JobManager job.JobManager
Vault vault.Vault
Expand Down Expand Up @@ -170,7 +173,35 @@ func (h *DiscoverHandler) Perform(
return nil, http.StatusInternalServerError, errors.Wrap(err, "Unable to retrieve table names from storage.")
}

loadOperatorMetadata, err := h.CustomReader.GetLoadOperatorSpecByOrganization(
ctx,
args.OrganizationId,
h.Database,
)
if err != nil {
return nil, http.StatusInternalServerError, errors.Wrap(err, "Unable to get load operators.")
}

// All user-created tables.
userTables := make(map[string]bool, len(loadOperatorMetadata))
for _, loadOperator := range loadOperatorMetadata {
loadSpec, ok := connector.CastToRelationalDBLoadParams(loadOperator.Spec.Load().Parameters)
if !ok {
return nil, http.StatusInternalServerError, errors.Newf("Cannot load table")
}
table := loadSpec.Table
userTables[table] = true
}

baseTables := make([]string, 0, len(tableNames))

for _, tableName := range tableNames {
if isUserTable := userTables[tableName]; !isUserTable { // not a user-created table
baseTables = append(baseTables, tableName)
}
}

return discoverResponse{
TableNames: tableNames,
TableNames: baseTables,
}, http.StatusOK, nil
}
1 change: 1 addition & 0 deletions src/golang/cmd/server/routes/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const (
ListIntegrationsRoute = "/api/integrations"
ConnectIntegrationRoute = "/api/integration/connect"
DeleteIntegrationRoute = "/api/integration/{integrationId}/delete"
CreateTableRoute = "/api/integration/{integrationId}/create"
PreviewTableRoute = "/api/integration/{integrationId}/preview_table"
DiscoverRoute = "/api/integration/{integrationId}/discover"

Expand Down
8 changes: 8 additions & 0 deletions src/golang/cmd/server/server/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,14 @@ func (s *AqServer) Handlers() map[string]handler.Handler {
Vault: s.Vault,
},
routes.DiscoverRoute: &handler.DiscoverHandler{
Database: s.Database,
CustomReader: s.CustomReader,
IntegrationReader: s.IntegrationReader,
StorageConfig: s.StorageConfig,
JobManager: s.JobManager,
Vault: s.Vault,
},
routes.CreateTableRoute: &handler.CreateTableHandler{
Database: s.Database,
IntegrationReader: s.IntegrationReader,
StorageConfig: s.StorageConfig,
Expand Down
39 changes: 39 additions & 0 deletions src/golang/internal/server/routes/routes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package routes

// Please sort the route by their VALUEs
const (
GetArtifactVersionsRoute = "/artifact_versions"
GetArtifactResultRoute = "/artifact_result/{workflowDagResultId}/{artifactId}"

ListBuiltinFunctionsRoute = "/builtinFunctions"
GetFunctionRoute = "/function/{functionId}"
ExportFunctionRoute = "/function/{operatorId}/export"

ListIntegrationsRoute = "/integrations"
ConnectIntegrationRoute = "/integration/connect"
DeleteIntegrationRoute = "/integration/{integrationId}/delete"
CreateTableRoute = "/integration/{integrationId}/create"
PreviewTableRoute = "/integration/{integrationId}/preview_table"
DiscoverRoute = "/integration/{integrationId}/discover"

ResetApiKeyRoute = "/keys/reset"

ListNotificationsRoute = "/notifications"
ArchiveNotificationRoute = "/notifications/{notificationId}/archive"

GetOperatorResultRoute = "/operator_result/{workflowDagResultId}/{operatorId}"

GetNodePositionsRoute = "/positioning"
PreviewRoute = "/preview"

GetUserProfileRoute = "/user"

ListWorkflowsRoute = "/workflows"
RegisterWorkflowRoute = "/workflow/register"
GetWorkflowRoute = "/workflow/{workflowId}"
DeleteWorkflowRoute = "/workflow/{workflowId}/delete"
EditWorkflowRoute = "/workflow/{workflowId}/edit"
RefreshWorkflowRoute = "/workflow/{workflowId}/refresh"
UnwatchWorkflowRoute = "/workflow/{workflowId}/unwatch"
WatchWorkflowRoute = "/workflow/{workflowId}/watch"
)
3 changes: 2 additions & 1 deletion src/golang/lib/job/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ func (j *ProcessJobManager) mapJobTypeToCmd(jobName string, spec Spec) (*exec.Cm
} else if spec.Type() == AuthenticateJobType ||
spec.Type() == LoadJobType ||
spec.Type() == ExtractJobType ||
spec.Type() == LoadTableJobType ||
spec.Type() == DiscoverJobType {
specStr, err := EncodeSpec(spec, JsonSerializationType)
if err != nil {
Expand Down Expand Up @@ -254,10 +255,10 @@ func (j *ProcessJobManager) Launch(
}

cmd, err := j.mapJobTypeToCmd(name, spec)
cmd.Env = os.Environ()
if err != nil {
return err
}
cmd.Env = os.Environ()

stdout := &bytes.Buffer{}
stderr := &bytes.Buffer{}
Expand Down
Loading

0 comments on commit e6bbc9b

Please sign in to comment.