Skip to content

Commit

Permalink
Moves cities to Badger
Browse files Browse the repository at this point in the history
See #197
  • Loading branch information
cuducos committed Apr 13, 2024
1 parent 24ca306 commit 7ffb4cd
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 60 deletions.
34 changes: 17 additions & 17 deletions transform/badger.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

const badgerFilePrefix = "minha-receita-badger-"

func removeLeadingZeros(n string) string {
func noLeadingZeros(n string) string {
var o string
isZero := true
for _, ch := range n {
Expand All @@ -25,37 +25,37 @@ func removeLeadingZeros(n string) string {
return o
}

func keyForMotives(n string) string { return fmt.Sprintf("motives%s", removeLeadingZeros(n)) }
func keyForPartners(n string) string { return fmt.Sprintf("partners%s", n) }
func keyForBase(n string) string { return fmt.Sprintf("base%s", n) }
func keyForTaxes(n string) string { return fmt.Sprintf("taxes%s", n) }
func keyForLookup(s sourceType, n string) string { return fmt.Sprintf("%s%s", s, noLeadingZeros(n)) }
func keyForPartners(n string) string { return fmt.Sprintf("partners%s", n) }
func keyForBase(n string) string { return fmt.Sprintf("base%s", n) }
func keyForTaxes(n string) string { return fmt.Sprintf("taxes%s", n) }

// functions to read data from Badger

func motivesOf(db *badger.DB, i int) (string, bool, error) {
var m string
found := false
n := fmt.Sprintf("%d", i)
func lookupOf(db *badger.DB, s sourceType, i int) (string, bool, error) {
var v string
ok := false
err := db.View(func(txn *badger.Txn) error {
i, err := txn.Get([]byte(keyForMotives(n)))
k := keyForLookup(s, fmt.Sprintf("%d", i))
r, err := txn.Get([]byte(k))
if errors.Is(err, badger.ErrKeyNotFound) {
return nil
}
if err != nil {
return fmt.Errorf("could not get key %s: %w", keyForMotives(n), err)
return fmt.Errorf("could not get key %s: %w", k, err)
}
v, err := i.ValueCopy(nil)
b, err := r.ValueCopy(nil)
if err != nil {
return fmt.Errorf("could not read value for key %s: %w", keyForMotives(n), err)
return fmt.Errorf("could not read value for key %s: %w", k, err)
}
m = string(v)
found = true
v = string(b)
ok = true
return nil
})
if err != nil {
return m, false, fmt.Errorf("error getting motive for %s: %w", n, err)
return v, false, fmt.Errorf("error getting %s for %d: %w", s, i, err)
}
return m, found, nil
return v, ok, nil
}

func partnersOf(db *badger.DB, n string) ([]partnerData, error) {
Expand Down
2 changes: 1 addition & 1 deletion transform/badger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func TestRemoveLeadingZeros(t *testing.T) {
{"test", "test"},
{"0", "0"},
} {
got := removeLeadingZeros(tc.value)
got := noLeadingZeros(tc.value)
if got != tc.expected {
t.Errorf("expected %s, got %s", tc.expected, got)
}
Expand Down
2 changes: 1 addition & 1 deletion transform/company.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func newCompany(row []string, l *lookups, kv kvStorage, privacy bool) (company,
}
c.DataSituacaoCadastral = dataSituacaoCadastral

if err := c.motivoSituacaoCadastral(l, row[7]); err != nil {
if err := c.motivoSituacaoCadastral(row[7]); err != nil {
return c, fmt.Errorf("error trying to parse MotivoSituacaoCadastral: %w", err)
}

Expand Down
120 changes: 88 additions & 32 deletions transform/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,17 @@ type item struct {
kind sourceType
}

func lookupHandler(_ *lookups, r []string) ([]byte, error) { return []byte(r[1]), nil }

func newKVItem(s sourceType, l *lookups, r []string) (i item, err error) {
var h func(l *lookups, r []string) ([]byte, error)
switch s {
case motives:
i.key = []byte(keyForMotives(r[0]))
h = func(_ *lookups, r []string) ([]byte, error) {
return []byte(r[1]), nil
}
i.key = []byte(keyForLookup(motives, r[0]))
h = lookupHandler
case cities:
i.key = []byte(keyForLookup(cities, r[0]))
h = lookupHandler
case partners:
i.key = []byte(keyForPartners(r[0]))
h = loadPartnerRow
Expand All @@ -46,12 +49,13 @@ func newKVItem(s sourceType, l *lookups, r []string) (i item, err error) {
}

type badgerStorage struct {
db *badger.DB
path string
db *badger.DB
path string
gotError bool
}

func (kv *badgerStorage) load(dir string, l *lookups) error {
srcs, err := newSources(dir, []sourceType{motives, base, partners, taxes})
srcs, err := newSources(dir, []sourceType{motives, cities, base, partners, taxes})
if err != nil {
return fmt.Errorf("could not load sources: %w", err)
}
Expand Down Expand Up @@ -113,55 +117,107 @@ func (kv *badgerStorage) load(dir string, l *lookups) error {
}
}

type lookupResult struct {
value string
ok bool
}

func (kv *badgerStorage) lookupHandler(ch chan<- lookupResult, errs chan<- error, s sourceType, k *int) {
var v string
var ok bool
if k == nil {
ch <- lookupResult{v, ok}
return
}
v, ok, err := lookupOf(kv.db, s, *k)
if err != nil {
if !kv.gotError {
errs <- err
}
kv.gotError = true
return
}
ch <- lookupResult{v, ok}
}

type lookupChannels struct {
motives chan lookupResult
cities chan lookupResult
}

func (ls *lookupChannels) close() {
close(ls.motives)
close(ls.cities)
}

func (kv *badgerStorage) enrichCompany(c *company) error {
n := cnpj.Base(c.CNPJ)
ms := make(chan string)
errs := make(chan error)
defer close(errs)

ch := lookupChannels{make(chan lookupResult), make(chan lookupResult)}
defer ch.close()

go kv.lookupHandler(ch.motives, errs, motives, c.MotivoSituacaoCadastral)
go kv.lookupHandler(ch.cities, errs, cities, c.CodigoMunicipio)

ps := make(chan []partnerData)
bs := make(chan baseData)
ts := make(chan taxesData)
errs := make(chan error)
go func() {
if c.MotivoSituacaoCadastral == nil {
return
}
m, ok, err := motivesOf(kv.db, *c.MotivoSituacaoCadastral)
if err != nil {
errs <- err
return
}
if !ok {
return
}
ms <- m
defer func() {
close(ts)
close(bs)
close(ps)
}()
n := cnpj.Base(c.CNPJ)
go func() {
p, err := partnersOf(kv.db, n)
if err != nil {
errs <- err
if !kv.gotError {
errs <- err
}
kv.gotError = true
return
}
ps <- p
if !kv.gotError {
ps <- p
}
}()
go func() {
v, err := baseOf(kv.db, n)
if err != nil {
errs <- err
if !kv.gotError {
errs <- err
}
kv.gotError = true
return
}
bs <- v
if !kv.gotError {
bs <- v
}
}()
go func() {
t, err := taxesOf(kv.db, n)
if err != nil {
errs <- err
if !kv.gotError {
errs <- err
}
kv.gotError = true
return
}
ts <- t
if !kv.gotError {
ts <- t
}
}()
for i := 0; i < 3; i++ {
for i := 0; i < 5; i++ {
select {
case m := <-ms:
c.DescricaoMotivoSituacaoCadastral = &m
case r := <-ch.motives:
if r.ok {
c.DescricaoMotivoSituacaoCadastral = &r.value
}
case r := <-ch.cities:
if r.ok {
c.Municipio = &r.value
}
case p := <-ps:
c.QuadroSocietario = p
case v := <-bs:
Expand Down
12 changes: 3 additions & 9 deletions transform/lookups.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ func newLookup(p string) (lookup, error) {
}

type lookups struct {
cities lookup
countries lookup
cnaes lookup
qualifications lookup
Expand All @@ -32,7 +31,7 @@ type lookups struct {

func newLookups(d string) (lookups, error) {
var ls []lookup
srcs := []sourceType{cities, countries, cnaes, qualifications, natures}
srcs := []sourceType{countries, cnaes, qualifications, natures}
for _, src := range srcs {
paths, err := pathsForSource(src, d)
if err != nil {
Expand All @@ -53,10 +52,10 @@ func newLookups(d string) (lookups, error) {
if err != nil {
return lookups{}, fmt.Errorf("error creating ibge lookup: %w", err)
}
return lookups{ls[0], ls[1], ls[2], ls[3], ls[4], c}, nil
return lookups{ls[0], ls[1], ls[2], ls[3], c}, nil
}

func (c *company) motivoSituacaoCadastral(l *lookups, v string) error {
func (c *company) motivoSituacaoCadastral(v string) error {
i, err := toInt(v)
if err != nil {
return fmt.Errorf("error trying to parse MotivoSituacaoCadastral %s: %w", v, err)
Expand Down Expand Up @@ -96,11 +95,6 @@ func (c *company) municipio(l *lookups, v string) error {
return nil
}
c.CodigoMunicipio = i
s, ok := l.cities[*i]
if !ok {
return nil
}
c.Municipio = &s
ibge, ok := l.ibge[*i]
if !ok {
log.Output(1, fmt.Sprintf("Could not find IBGE city code for %s-%s (%d)", *c.Municipio, c.UF, *i))
Expand Down

0 comments on commit 7ffb4cd

Please sign in to comment.