Skip to content

feat: Neo4j CSV to RDF Converter #7545

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Apr 23, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 178 additions & 0 deletions contrib/neo4j-converter/Neo4jCSVToRDFConverter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package main

import (
"bufio"
"bytes"
"encoding/csv"
"errors"
"flag"
"fmt"
"github.com/dgraph-io/dgraph/x"
"io"
"log"
"os"
"path/filepath"
"strings"
"time"
)

var (
inputPath = flag.String("input", "", "Please provide the input csv file.")
outputPath = flag.String("output", "", "Where to place the output?")
)

func main() {
flag.Parse()
//check input path length
if len(*inputPath) == 0 {
log.Fatal("Please set the input argument.")
}
//check output path length
if len(*outputPath) == 0 {
log.Fatal("Please set the output argument.")
}
fmt.Printf("CSV to convert: %q ?[y/n]", *inputPath)

var inputConf, outputConf string
x.Check2(fmt.Scanf("%s", &inputConf))

fmt.Printf("Output directory wanted: %q ?[y/n]", *outputPath)
x.Check2(fmt.Scanf("%s", &outputConf))

if inputConf != "y" || outputConf != "y" {
fmt.Println("Please update the directories")
return
}

//open the file
ifile, err := os.Open(*inputPath)
x.Check(err)
defer ifile.Close()
//log the start time
ts := time.Now().UnixNano()

//create output file in append mode
outputName := filepath.Join(*outputPath, fmt.Sprintf("converted_%d.rdf", ts))
oFile, err := os.OpenFile(outputName, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600)
x.Check(err)
defer func() {
oFile.Close()
}()
//process the file
x.Check(processNeo4jCSV(ifile, oFile))
fmt.Printf("Finished writing %q", outputName)

}

// func processCSV(inputFile string, outputPath string) {
func processNeo4jCSV(r io.Reader, w io.Writer) error {

scanner := bufio.NewScanner(r)
scanner.Split(bufio.ScanLines)
var text, rdfLines bytes.Buffer

header := make(map[int]string)
positionOfStart, startPositionOfProperty := -1, -1

//read header
readHeader := func() {
h := csv.NewReader(strings.NewReader(scanner.Text()))
line, _ := h.Read()
//read headers
for position, fieldName := range line {
header[position] = fieldName

if fieldName == "_start" {
positionOfStart = position
} else if fieldName == "_type" {
startPositionOfProperty = position + 1
}
}
}

// Scan and read the header.
scanner.Scan()
readHeader()
//ensure that header exists
if positionOfStart == -1 {
return errors.New("column '_start' is absent in file")
}

// Read the actual data.
for scanner.Scan() {
//parse csv
text.WriteString(scanner.Text() + "\n")
d := csv.NewReader(strings.NewReader(text.String()))
records, err := d.ReadAll()
x.Check(err)

linkStartNode := ""
linkEndNode := ""
linkName := ""
facets := make(map[string]string)

line := records[0]
for position := 0; position < len(line); position++ {

// This is an _id node.
if len(line[0]) > 0 {
bn := fmt.Sprintf("<_:k_%s>", line[0])
if position < positionOfStart && position > 0 {
//write non-facet data
rdfLines.WriteString(fmt.Sprintf("%s <%s> \"%s\" .\n",
bn, header[position], line[position]))
}
continue
}
// Handle relationship data.
if position >= positionOfStart {
if header[position] == "_start" {
linkStartNode = fmt.Sprintf("<_:k_%s>", line[position])
} else if header[position] == "_end" {
linkEndNode = fmt.Sprintf("<_:k_%s>", line[position])
} else if header[position] == "_type" {
linkName = fmt.Sprintf("<%s>", line[position])
} else if position >= startPositionOfProperty {
//collect facets
facets[header[position]] = line[position]
}
continue
}
}
//write the facets
if len(linkName) > 0 {
facetLine := ""
atleastOneFacetExists := false
for facetName, facetValue := range facets {
if len(facetValue) == 0 {
continue
}
//strip [ ], and assume only one value
facetValue = strings.Replace(facetValue, "[", "", 1)
facetValue = strings.Replace(facetValue, "]", "", 1)
if atleastOneFacetExists {
//insert a comma to separate multiple facets
facetLine = fmt.Sprintf("%s, ", facetLine)
}
//write the actual facet
facetLine = fmt.Sprintf("%s %s=%s", facetLine, facetName, facetValue)
atleastOneFacetExists = true
}
if atleastOneFacetExists {
//wrap all facets with round brackets
facetLine = fmt.Sprintf("( %s )", facetLine)
}
rdfLines.WriteString(fmt.Sprintf("%s %s %s %s.\n", linkStartNode, linkName, linkEndNode, facetLine))
}

text.Reset()
//write a chunk when ready
if rdfLines.Len() > 100<<20 {
// Flush the writes and reset the rdfLines
x.Check2(w.Write(rdfLines.Bytes()))
rdfLines.Reset()
}
}
x.Check2(w.Write(rdfLines.Bytes()))
return nil
}
53 changes: 53 additions & 0 deletions contrib/neo4j-converter/Neo4jConverter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package main

import (
"bytes"
"fmt"
"github.com/stretchr/testify/require"
"io/ioutil"
"strings"
"testing"
)

func TestParsingHeader(t *testing.T) {
i := strings.NewReader("my request")
buf := new(bytes.Buffer)
err := processNeo4jCSV(i, buf)
require.Contains(t, err.Error(), "column '_start' is absent")
//log output
fmt.Println(buf.String())
}


func TestSingleLineFileString(t *testing.T) {
header:=`"_id","_labels","born","name","released","tagline","title","_start","_end","_type","roles"`
detail := `"188",":Movie","","","1999","Welcome to the Real World","The Matrix",,,,`
fileLines := fmt.Sprintf("%s\n%s",header, detail)

i := strings.NewReader(fileLines)
buf := new(bytes.Buffer)
processNeo4jCSV(i, buf)
require.Contains(t, buf.String(), "<_:k_188> <_labels> \":Movie\" .")
}

func TestWholeFile(t *testing.T){
inBuf,_:= ioutil.ReadFile("./example.csv")
i := strings.NewReader(string(inBuf))
buf := new(bytes.Buffer)
processNeo4jCSV(i, buf)
//check id
require.Contains(t, buf.String(), "<_:k_188> <_labels> \":Movie\" .")
//check facets
require.Contains(t, buf.String(),"<_:k_191> <ACTED_IN> <_:k_188> ( roles=\"Morpheus\" )")
//check link w/o facets
require.Contains(t, buf.String(),"<_:k_193> <DIRECTED> <_:k_188>")
}

func BenchmarkSampleFile(b *testing.B) {
inBuf,_:= ioutil.ReadFile("./example.csv")
i := strings.NewReader(string(inBuf))
for k := 0; k < b.N; k++ {
buf := new(bytes.Buffer)
processNeo4jCSV(i, buf)
}
}
16 changes: 16 additions & 0 deletions contrib/neo4j-converter/example.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
"_id","_labels","born","name","released","tagline","title","_start","_end","_type","roles"
"188",":Movie","","","1999","Welcome to the Real World","The Matrix",,,,
"189",":Person","1964","Keanu Reeves","","","",,,,
"190",":Person","1967","Carrie-Anne Moss","","","",,,,
"191",":Person","1961","Laurence Fishburne","","","",,,,
"192",":Person","1960","Hugo Weaving","","","",,,,
"193",":Person","1967","Lilly Wachowski","","","",,,,
"194",":Person","1965","Lana Wachowski","","","",,,,
"195",":Person","1952","Joel Silver","","","",,,,
,,,,,,,"189","188","ACTED_IN","[""Neo""]"
,,,,,,,"190","188","ACTED_IN","[""Trinity""]"
,,,,,,,"191","188","ACTED_IN","[""Morpheus""]"
,,,,,,,"192","188","ACTED_IN","[""Agent Smith""]"
,,,,,,,"193","188","DIRECTED",""
,,,,,,,"194","188","DIRECTED",""
,,,,,,,"195","188","PRODUCED",""