Skip to content

Commit

Permalink
feat: Neo4j CSV to RDF Converter (#7545)
Browse files Browse the repository at this point in the history
* feat: Neo4j to CSV Converter

* Fixed formatting issues

* Merged review feedback and added some tests

* Completed test cases and benchmarks

* Added more test cases, and formatting

* go fmt pretty print (CI build failed because of this)
  • Loading branch information
anand-chandrashekar authored Apr 23, 2021
1 parent 6b447b5 commit 8b1cfdc
Show file tree
Hide file tree
Showing 5 changed files with 332 additions and 0 deletions.
185 changes: 185 additions & 0 deletions contrib/neo4j-converter/Neo4jCSVToRDFConverter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
package main

import (
"bufio"
"bytes"
"encoding/csv"
"errors"
"flag"
"fmt"
"io"
"log"
"os"
"path/filepath"
"strings"
"time"
)

var (
inputPath = flag.String("input", "", "Please provide the input csv file.")
outputPath = flag.String("output", "", "Where to place the output?")
)

func main() {
flag.Parse()
//check input path length
if len(*inputPath) == 0 {
log.Fatal("Please set the input argument.")
}
//check output path length
if len(*outputPath) == 0 {
log.Fatal("Please set the output argument.")
}
fmt.Printf("CSV to convert: %q ?[y/n]", *inputPath)

var inputConf, outputConf string
check2(fmt.Scanf("%s", &inputConf))

fmt.Printf("Output directory wanted: %q ?[y/n]", *outputPath)
check2(fmt.Scanf("%s", &outputConf))

if inputConf != "y" || outputConf != "y" {
fmt.Println("Please update the directories")
return
}

//open the file
ifile, err := os.Open(*inputPath)
check(err)
defer ifile.Close()
//log the start time
ts := time.Now().UnixNano()

//create output file in append mode
outputName := filepath.Join(*outputPath, fmt.Sprintf("converted_%d.rdf", ts))
oFile, err := os.OpenFile(outputName, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600)
check(err)
defer oFile.Close()
//process the file
check(processNeo4jCSV(ifile, oFile))
fmt.Printf("Finished writing %q", outputName)

}

func processNeo4jCSV(r io.Reader, w io.Writer) error {

scanner := bufio.NewScanner(r)
scanner.Split(bufio.ScanLines)
var text, rdfLines bytes.Buffer

header := make(map[int]string)
positionOfStart, startPositionOfProperty := -1, -1

//read header
readHeader := func() {
h := csv.NewReader(strings.NewReader(scanner.Text()))
line, _ := h.Read()
//read headers
for position, fieldName := range line {
header[position] = fieldName

if fieldName == "_start" {
positionOfStart = position
} else if fieldName == "_type" {
startPositionOfProperty = position + 1
}
}
}

// Scan and read the header.
scanner.Scan()
readHeader()
//ensure that header exists
if positionOfStart == -1 {
return errors.New("column '_start' is absent in file")
}

// Read the actual data.
for scanner.Scan() {
//parse csv
text.WriteString(scanner.Text() + "\n")
d := csv.NewReader(strings.NewReader(text.String()))
records, err := d.ReadAll()
check(err)

linkStartNode := ""
linkEndNode := ""
linkName := ""
facets := make(map[string]string)

line := records[0]
for position := 0; position < len(line); position++ {

// This is an _id node.
if len(line[0]) > 0 {
bn := fmt.Sprintf("<_:k_%s>", line[0])
if position < positionOfStart && position > 0 {
//write non-facet data
rdfLines.WriteString(fmt.Sprintf("%s <%s> \"%s\" .\n",
bn, header[position], line[position]))
}
continue
}
// Handle relationship data.
if position >= positionOfStart {
if header[position] == "_start" {
linkStartNode = fmt.Sprintf("<_:k_%s>", line[position])
} else if header[position] == "_end" {
linkEndNode = fmt.Sprintf("<_:k_%s>", line[position])
} else if header[position] == "_type" {
linkName = fmt.Sprintf("<%s>", line[position])
} else if position >= startPositionOfProperty {
//collect facets
facets[header[position]] = line[position]
}
continue
}
}
//write the facets
if len(linkName) > 0 {
facetLine := ""
atleastOneFacetExists := false
for facetName, facetValue := range facets {
if len(facetValue) == 0 {
continue
}
//strip [ ], and assume only one value
facetValue = strings.Replace(facetValue, "[", "", 1)
facetValue = strings.Replace(facetValue, "]", "", 1)
if atleastOneFacetExists {
//insert a comma to separate multiple facets
facetLine = fmt.Sprintf("%s, ", facetLine)
}
//write the actual facet
facetLine = fmt.Sprintf("%s %s=%s", facetLine, facetName, facetValue)
atleastOneFacetExists = true
}
if atleastOneFacetExists {
//wrap all facets with round brackets
facetLine = fmt.Sprintf("( %s )", facetLine)
}
rdfLines.WriteString(fmt.Sprintf("%s %s %s %s .\n",
linkStartNode, linkName, linkEndNode, facetLine))
}

text.Reset()
//write a chunk when ready
if rdfLines.Len() > 100<<20 {
// Flush the writes and reset the rdfLines
check2(w.Write(rdfLines.Bytes()))
rdfLines.Reset()
}
}
check2(w.Write(rdfLines.Bytes()))
return nil
}
func check2(_ interface{}, err error) {
if err != nil {
log.Fatal(err)
}
}
func check(err error) {
if err != nil {
log.Fatal(err)
}
}
75 changes: 75 additions & 0 deletions contrib/neo4j-converter/Neo4jConverter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package main

import (
"bytes"
"fmt"
"github.com/sergi/go-diff/diffmatchpatch"
"github.com/stretchr/testify/require"
"io/ioutil"
"strings"
"testing"
)

func TestParsingHeader(t *testing.T) {
i := strings.NewReader("my request")
buf := new(bytes.Buffer)
require.Error(t, processNeo4jCSV(i, buf), "column '_start' is absent in file")
}

func TestSingleLineFileString(t *testing.T) {
header := `"_id","_labels","born","name","released","tagline"` +
`,"title","_start","_end","_type","roles"`
detail := `"188",":Movie","","","1999","Welcome to the Real World","The Matrix",,,,`
fileLines := fmt.Sprintf("%s\n%s", header, detail)
output := `<_:k_188> <_labels> ":Movie" .
<_:k_188> <born> "" .
<_:k_188> <name> "" .
<_:k_188> <released> "1999" .
<_:k_188> <tagline> "Welcome to the Real World" .
<_:k_188> <title> "The Matrix" .
`
i := strings.NewReader(fileLines)
buf := new(bytes.Buffer)
processNeo4jCSV(i, buf)
require.Equal(t, buf.String(), output)
}

func TestWholeFile(t *testing.T) {
goldenFile := "./output.rdf"
inBuf, _ := ioutil.ReadFile("./example.csv")
i := strings.NewReader(string(inBuf))
buf := new(bytes.Buffer)
processNeo4jCSV(i, buf)
//check id
require.Contains(t, buf.String(), "<_:k_188> <_labels> \":Movie\" .")
//check facets
require.Contains(t, buf.String(),
"<_:k_191> <ACTED_IN> <_:k_188> ( roles=\"Morpheus\" )")
//check link w/o facets
require.Contains(t, buf.String(), "<_:k_193> <DIRECTED> <_:k_188>")

//check full file
expected, err := ioutil.ReadFile(goldenFile)
if err != nil {
// Handle error
}
isSame := bytes.Equal(expected, buf.Bytes())
if !isSame {
fmt.Println("Printing comparison")
dmp := diffmatchpatch.New()
diffs := dmp.DiffMain(string(expected), buf.String(), true)
fmt.Println(dmp.DiffPrettyText(diffs))
}
require.True(t, isSame)

}

func BenchmarkSampleFile(b *testing.B) {
inBuf, _ := ioutil.ReadFile("./example.csv")
i := strings.NewReader(string(inBuf))
buf := new(bytes.Buffer)
for k := 0; k < b.N; k++ {
processNeo4jCSV(i, buf)
buf.Reset()
}
}
16 changes: 16 additions & 0 deletions contrib/neo4j-converter/example.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
"_id","_labels","born","name","released","tagline","title","_start","_end","_type","roles"
"188",":Movie","","","1999","Welcome to the Real World","The Matrix",,,,
"189",":Person","1964","Keanu Reeves","","","",,,,
"190",":Person","1967","Carrie-Anne Moss","","","",,,,
"191",":Person","1961","Laurence Fishburne","","","",,,,
"192",":Person","1960","Hugo Weaving","","","",,,,
"193",":Person","1967","Lilly Wachowski","","","",,,,
"194",":Person","1965","Lana Wachowski","","","",,,,
"195",":Person","1952","Joel Silver","","","",,,,
,,,,,,,"189","188","ACTED_IN","[""Neo""]"
,,,,,,,"190","188","ACTED_IN","[""Trinity""]"
,,,,,,,"191","188","ACTED_IN","[""Morpheus""]"
,,,,,,,"192","188","ACTED_IN","[""Agent Smith""]"
,,,,,,,"193","188","DIRECTED",""
,,,,,,,"194","188","DIRECTED",""
,,,,,,,"195","188","PRODUCED",""
55 changes: 55 additions & 0 deletions contrib/neo4j-converter/output.rdf
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
<_:k_188> <_labels> ":Movie" .
<_:k_188> <born> "" .
<_:k_188> <name> "" .
<_:k_188> <released> "1999" .
<_:k_188> <tagline> "Welcome to the Real World" .
<_:k_188> <title> "The Matrix" .
<_:k_189> <_labels> ":Person" .
<_:k_189> <born> "1964" .
<_:k_189> <name> "Keanu Reeves" .
<_:k_189> <released> "" .
<_:k_189> <tagline> "" .
<_:k_189> <title> "" .
<_:k_190> <_labels> ":Person" .
<_:k_190> <born> "1967" .
<_:k_190> <name> "Carrie-Anne Moss" .
<_:k_190> <released> "" .
<_:k_190> <tagline> "" .
<_:k_190> <title> "" .
<_:k_191> <_labels> ":Person" .
<_:k_191> <born> "1961" .
<_:k_191> <name> "Laurence Fishburne" .
<_:k_191> <released> "" .
<_:k_191> <tagline> "" .
<_:k_191> <title> "" .
<_:k_192> <_labels> ":Person" .
<_:k_192> <born> "1960" .
<_:k_192> <name> "Hugo Weaving" .
<_:k_192> <released> "" .
<_:k_192> <tagline> "" .
<_:k_192> <title> "" .
<_:k_193> <_labels> ":Person" .
<_:k_193> <born> "1967" .
<_:k_193> <name> "Lilly Wachowski" .
<_:k_193> <released> "" .
<_:k_193> <tagline> "" .
<_:k_193> <title> "" .
<_:k_194> <_labels> ":Person" .
<_:k_194> <born> "1965" .
<_:k_194> <name> "Lana Wachowski" .
<_:k_194> <released> "" .
<_:k_194> <tagline> "" .
<_:k_194> <title> "" .
<_:k_195> <_labels> ":Person" .
<_:k_195> <born> "1952" .
<_:k_195> <name> "Joel Silver" .
<_:k_195> <released> "" .
<_:k_195> <tagline> "" .
<_:k_195> <title> "" .
<_:k_189> <ACTED_IN> <_:k_188> ( roles="Neo" ) .
<_:k_190> <ACTED_IN> <_:k_188> ( roles="Trinity" ) .
<_:k_191> <ACTED_IN> <_:k_188> ( roles="Morpheus" ) .
<_:k_192> <ACTED_IN> <_:k_188> ( roles="Agent Smith" ) .
<_:k_193> <DIRECTED> <_:k_188> .
<_:k_194> <DIRECTED> <_:k_188> .
<_:k_195> <PRODUCED> <_:k_188> .
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ require (
github.com/prometheus/client_golang v0.9.3
github.com/prometheus/common v0.4.1 // indirect
github.com/prometheus/procfs v0.0.0-20190517135640-51af30a78b0e // indirect
github.com/sergi/go-diff v1.1.0
github.com/soheilhy/cmux v0.1.4
github.com/spf13/cast v1.3.0
github.com/spf13/cobra v0.0.5
Expand Down

0 comments on commit 8b1cfdc

Please sign in to comment.