Skip to content

Commit

Permalink
Updated uploader and database methods
Browse files Browse the repository at this point in the history
Changed connectDB() to use non-deprecated methods, and fixed many issues in uploader (currently still broken)
  • Loading branch information
mohammadmehrab committed Apr 23, 2024
1 parent 567572c commit efb0f9c
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 64 deletions.
11 changes: 4 additions & 7 deletions uploader/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,15 @@ import (
)

func connectDB() *mongo.Client {
client, err := mongo.NewClient(options.Client().ApplyURI(getEnvMongoURI()))
if err != nil {
log.Panic("Unable to create MongoDB client")
os.Exit(1)
}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

err = client.Connect(ctx)
opts := options.Client().ApplyURI(getEnvMongoURI())

client, err := mongo.Connect(ctx, opts)
if err != nil {
log.Panic("Unable to connect to database")
log.Panic("Unable to create MongoDB client and connect to database")
os.Exit(1)
}

Expand Down
169 changes: 112 additions & 57 deletions uploader/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@ import (
"fmt"
"log"
"os"

"time"

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"

"github.com/UTDNebula/nebula-api/api/schema"
"github.com/joho/godotenv"
Expand All @@ -26,7 +29,7 @@ import (
// Also note that this uploader assumes that the collection names match the names of these files, which they should.
// If the names of these collections ever change, the file names should be updated accordingly.

var filesToUpload []string = []string{"courses.json", "professors.json", "sections.json"}
var filesToUpload [3]string = [3]string{"courses.json", "professors.json", "sections.json"}

func Upload(inDir string, replace bool) {

Expand All @@ -39,7 +42,7 @@ func Upload(inDir string, replace bool) {
client := connectDB()

// Get context
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()

for _, path := range filesToUpload {
Expand All @@ -51,7 +54,7 @@ func Upload(inDir string, replace bool) {

switch path {
case "courses.json":
fmt.Println("Uploading courses.json ...")
log.Println("Uploading courses.json ...")

// Decode courses from courses.json
var courses []schema.Course
Expand All @@ -62,49 +65,67 @@ func Upload(inDir string, replace bool) {
}

if replace {
var empty interface{}

// Get collection
collection := getCollection(client, "courses")

// Delete all documents from collection
_, err := collection.DeleteMany(ctx, empty)
_, err := collection.DeleteMany(ctx, bson.D{})
if err != nil {
log.Panic(err)
}

// Add all documents decoded from courses.json into Mongo collection
for _, course := range courses {
_, err := collection.InsertOne(ctx, course)
if err != nil {
log.Panic(err)
}
// Convert your courses to []interface{}
courseDocs := make([]interface{}, len(courses))
for i := range courses {
courseDocs[i] = courses[i]
}

// Add all documents decoded from courses.json into the temporary collection
opts := options.InsertMany().SetOrdered(false)
_, err = collection.InsertMany(ctx, courseDocs, opts)
if err != nil {
log.Panic(err)
}

} else {

// If a temp collection already exists, drop it
tempCollection := getCollection(client, "temp")
err = tempCollection.Drop(ctx)
if err != nil {
log.Panic(err)
}

// Create a temporary collection
err := client.Database("combinedDB").CreateCollection(ctx, "temp")
if err != nil {
log.Panic(err)
}

// Get the temporary collection
tempCollection := getCollection(client, "temp")
tempCollection = getCollection(client, "temp")

// Convert your courses to []interface{}
courseDocs := make([]interface{}, len(courses))
for i := range courses {
courseDocs[i] = courses[i]
}

// Add all documents decoded from courses.json into the temporary collection
for _, course := range courses {
_, err := tempCollection.InsertOne(ctx, course)
if err != nil {
log.Panic(err)
}
opts := options.InsertMany().SetOrdered(false)
_, err = tempCollection.InsertMany(ctx, courseDocs, opts)
if err != nil {
log.Panic(err)
}

// Create a merge aggregate pipeline
// Matched documents from the temporary collection will replace matched documents from the Mongo collection
// Unmatched documents from the temporary collection will be inserted into the Mongo collection
mergeStage := bson.D{primitive.E{Key: "$merge", Value: bson.D{primitive.E{Key: "into", Value: "courses"}, primitive.E{Key: "on", Value: []string{"catalog_year", "course_number", "subject_prefix"}}, primitive.E{Key: "whenMatched", Value: "replace"}, primitive.E{Key: "whenNotMatched", Value: "insert"}}}}
mergeStage := bson.D{primitive.E{Key: "$merge", Value: bson.D{primitive.E{Key: "into", Value: "courses"}, primitive.E{Key: "on", Value: [3]string{"catalog_year", "course_number", "subject_prefix"}}, primitive.E{Key: "whenMatched", Value: "replace"}, primitive.E{Key: "whenNotMatched", Value: "insert"}}}}

// Execute aggregate pipeline
_, err = tempCollection.Aggregate(ctx, mergeStage)
_, err = tempCollection.Aggregate(ctx, mongo.Pipeline{mergeStage})
if err != nil {
log.Panic(err)
}
Expand All @@ -116,10 +137,10 @@ func Upload(inDir string, replace bool) {
}
}

fmt.Println("Done uploading courses.json!")
log.Println("Done uploading courses.json!")

case "professors.json":
fmt.Println("Uploading professors.json ...")
log.Println("Uploading professors.json ...")

// Decode courses from professors.json
var professors []schema.Professor
Expand All @@ -130,49 +151,67 @@ func Upload(inDir string, replace bool) {
}

if replace {
var empty interface{}

// Get collection
collection := getCollection(client, "professors")

// Delete all documents from collection
_, err := collection.DeleteMany(ctx, empty)
_, err := collection.DeleteMany(ctx, bson.D{})
if err != nil {
log.Panic(err)
}

// Add all documents decoded from professors.json into Mongo collection
for _, professor := range professors {
_, err := collection.InsertOne(ctx, professor)
if err != nil {
log.Panic(err)
}
// Convert your professors to []interface{}
professorsDocs := make([]interface{}, len(professors))
for i := range professors {
professorsDocs[i] = professors[i]
}

// Add all documents decoded from professors.json into the temporary collection
opts := options.InsertMany().SetOrdered(false)
_, err = collection.InsertMany(ctx, professorsDocs, opts)
if err != nil {
log.Panic(err)
}

} else {

// If a temp collection already exists, drop it
tempCollection := getCollection(client, "temp")
err = tempCollection.Drop(ctx)
if err != nil {
log.Panic(err)
}

// Create a temporary collection
err := client.Database("combinedDB").CreateCollection(ctx, "temp")
if err != nil {
log.Panic(err)
}

// Get the temporary collection
tempCollection := getCollection(client, "temp")
tempCollection = getCollection(client, "temp")

// Convert your professors to []interface{}
professorsDocs := make([]interface{}, len(professors))
for i := range professors {
professorsDocs[i] = professors[i]
}

// Add all documents decoded from professors.json into the temporary collection
for _, professor := range professors {
_, err = tempCollection.InsertOne(ctx, professor)
if err != nil {
log.Panic(err)
}
opts := options.InsertMany().SetOrdered(false)
_, err = tempCollection.InsertMany(ctx, professorsDocs, opts)
if err != nil {
log.Panic(err)
}

// Create a merge aggregate pipeline
// Matched documents from the temporary collection will replace matched documents from the Mongo collection
// Unmatched documents from the temporary collection will be inserted into the Mongo collection
mergeStage := bson.D{primitive.E{Key: "$merge", Value: bson.D{primitive.E{Key: "into", Value: "professors"}, primitive.E{Key: "on", Value: []string{"first_name", "last_name"}}, primitive.E{Key: "whenMatched", Value: "replace"}, primitive.E{Key: "whenNotMatched", Value: "insert"}}}}
mergeStage := bson.D{primitive.E{Key: "$merge", Value: bson.D{primitive.E{Key: "into", Value: "professors"}, primitive.E{Key: "on", Value: [2]string{"first_name", "last_name"}}, primitive.E{Key: "whenMatched", Value: "replace"}, primitive.E{Key: "whenNotMatched", Value: "insert"}}}}

// Execute aggregate pipeline
_, err = tempCollection.Aggregate(ctx, mergeStage)
_, err = tempCollection.Aggregate(ctx, mongo.Pipeline{mergeStage})
if err != nil {
log.Panic(err)
}
Expand All @@ -184,10 +223,10 @@ func Upload(inDir string, replace bool) {
}
}

fmt.Println("Done uploading professors.json!")
log.Println("Done uploading professors.json!")

case "sections.json":
fmt.Println("Uploading sections.json ...")
log.Println("Uploading sections.json ...")

// Decode courses from sections.json
var sections []schema.Section
Expand All @@ -198,49 +237,65 @@ func Upload(inDir string, replace bool) {
}

if replace {
var empty interface{}

// Get collection
collection := getCollection(client, "sections")

// Delete all documents from collection
_, err := collection.DeleteMany(ctx, empty)
_, err := collection.DeleteMany(ctx, bson.D{})
if err != nil {
log.Panic(err)
}

// Add all documents decoded from sections.json into Mongo collection
for _, section := range sections {
_, err := collection.InsertOne(ctx, section)
if err != nil {
log.Panic(err)
}
// Convert your sections to []interface{}
sectionsDocs := make([]interface{}, len(sections))
for i := range sections {
sectionsDocs[i] = sections[i]
}

// Add all documents decoded from sections.json into the temporary collection
opts := options.InsertMany().SetOrdered(false)
_, err = collection.InsertMany(ctx, sectionsDocs, opts)
if err != nil {
log.Panic(err)
}
} else {
// If a temp collection already exists, drop it
tempCollection := getCollection(client, "temp")
err = tempCollection.Drop(ctx)
if err != nil {
log.Panic(err)
}

// Create a temporary collection
err := client.Database("combinedDB").CreateCollection(ctx, "temp")
if err != nil {
log.Panic(err)
}

// Get the temporary collection
tempCollection := getCollection(client, "temp")
tempCollection = getCollection(client, "temp")

// Add all documents decoded from sections.json into the temporary collection
for _, section := range sections {
_, err := tempCollection.InsertOne(ctx, section)
if err != nil {
log.Panic(err)
}
// Convert your sections to []interface{}
sectionsDocs := make([]interface{}, len(sections))
for i := range sections {
sectionsDocs[i] = sections[i]
}

// Add all documents decoded from professors.json into the temporary collection
opts := options.InsertMany().SetOrdered(false)
_, err = tempCollection.InsertMany(ctx, sectionsDocs, opts)
if err != nil {
log.Panic(err)
}

// Create a merge aggregate pipeline
// Matched documents from the temporary collection will replace matched documents from the Mongo collection
// Unmatched documents from the temporary collection will be inserted into the Mongo collection
mergeStage := bson.D{primitive.E{Key: "$merge", Value: bson.D{primitive.E{Key: "into", Value: "sections"}, primitive.E{Key: "on", Value: []string{"section_number", "course_reference", "academic_session"}}, primitive.E{Key: "whenMatched", Value: "replace"}, primitive.E{Key: "whenNotMatched", Value: "insert"}}}}
mergeStage := bson.D{primitive.E{Key: "$merge", Value: bson.D{primitive.E{Key: "into", Value: "sections"}, primitive.E{Key: "on", Value: [3]string{"section_number", "course_reference", "academic_session"}}, primitive.E{Key: "whenMatched", Value: "replace"}, primitive.E{Key: "whenNotMatched", Value: "insert"}}}}

// Execute aggregate pipeline
_, err = tempCollection.Aggregate(ctx, mergeStage)
_, err = tempCollection.Aggregate(ctx, mongo.Pipeline{mergeStage})
if err != nil {
log.Panic(err)
}
Expand All @@ -252,7 +307,7 @@ func Upload(inDir string, replace bool) {
}
}

fmt.Println("Done uploading sections.json!")
log.Println("Done uploading sections.json!")
}

defer fptr.Close()
Expand Down

0 comments on commit efb0f9c

Please sign in to comment.