Skip to content

Commit

Permalink
Refactor and consolidate
Browse files Browse the repository at this point in the history
Remove file watcher
  • Loading branch information
maeb committed Dec 1, 2022
1 parent 97bbbca commit f77ad37
Show file tree
Hide file tree
Showing 29 changed files with 1,060 additions and 1,030 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/release.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
name: release

on:
pull_request:
branches:
- main
push:
branches:
- main
Expand All @@ -17,11 +14,14 @@ env:
jobs:
build:
runs-on: ubuntu-latest
permissions:
contents: read
packages: write

steps:
- name: Extract metadata (tags, labels) for Docker
id: meta
uses: docker/metadata-action@v3
uses: docker/metadata-action@v4
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
tags: |
Expand Down
24 changes: 24 additions & 0 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ on:
permissions:
contents: read

env:
REGISTRY: ghcr.io
IMAGE_NAME: ${{ github.repository }}

jobs:
unit_test:
name: Golang unit tests
Expand Down Expand Up @@ -44,3 +48,23 @@ jobs:
version: latest
# Enable additional linters (see: https://golangci-lint.run/usage/linters/)
args: -E "bodyclose" -E "dogsled" -E "durationcheck" -E "errorlint" -E "forcetypeassert" -E "noctx" -E "exhaustive" -E "exportloopref" --timeout 3m0s
build:
runs-on: ubuntu-latest
steps:
- name: Extract metadata (tags, labels) for Docker
id: meta
uses: docker/metadata-action@v4
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
tags: |
type=semver,pattern={{version}}
type=ref,event=branch
type=ref,event=pr
- name: Build and push Docker image
uses: docker/build-push-action@v3
with:
build-args: |
VERSION=${{ steps.meta.outputs.version }}
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
154 changes: 89 additions & 65 deletions cmd/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@
package index

import (
"context"
"fmt"
"os"
"os/signal"
"regexp"
"runtime"
"syscall"
"time"

"github.com/bits-and-blooms/bloom/v3"
Expand All @@ -34,7 +38,7 @@ import (

func NewCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "index [dir] ...",
Use: "index",
Short: "Index warc file(s)",
PreRunE: func(cmd *cobra.Command, args []string) error {
if err := viper.BindPFlags(cmd.Flags()); err != nil {
Expand All @@ -44,55 +48,51 @@ func NewCommand() *cobra.Command {
},
RunE: indexCmd,
}
// defaults
format := "cdxj"
indexDepth := 4
indexWorkers := 8
badgerDir := "./warcdb"
badgerBatchMaxSize := 1000
badgerBatchMaxWait := 5 * time.Second
badgerCompression := "snappy"
badgerDatabase := ""
var tikvPdAddr []string
tikvBatchMaxSize := 1000
tikvBatchMaxWait := 5 * time.Second
tikvDatabase := ""
bloomCapacity := uint(1000)
bloomFp := 0.01

cmd.Flags().StringP("format", "f", format, `index format: "cdxj", "cdxpb", "badger", "tikv" or "toc"`)
cmd.Flags().StringSlice("include", nil, "only include files matching these regular expressions")
cmd.Flags().StringSlice("exclude", nil, "exclude files matching these regular expressions")
cmd.Flags().IntP("max-depth", "d", indexDepth, "maximum directory recursion")
cmd.Flags().Int("workers", indexWorkers, "number of files")
cmd.Flags().StringSlice("dirs", nil, "directories to search for warc files in")
cmd.Flags().Uint("toc-bloom-capacity", bloomCapacity, "estimated bloom filter capacity")
cmd.Flags().Float64("toc-bloom-fp", bloomFp, "estimated bloom filter false positive rate")
cmd.Flags().String("badger-dir", badgerDir, "path to index database")
cmd.Flags().String("badger-database", badgerDatabase, "name of badger database")
cmd.Flags().Int("badger-batch-max-size", badgerBatchMaxSize, "max transaction batch size in badger")
cmd.Flags().Duration("badger-batch-max-wait", badgerBatchMaxWait, "max wait time before flushing batched records")
cmd.Flags().String("badger-compression", badgerCompression, "compression algorithm")
cmd.Flags().StringSlice("tikv-pd-addr", tikvPdAddr, "host:port of TiKV placement driver")
cmd.Flags().Int("tikv-batch-max-size", tikvBatchMaxSize, "max transaction batch size")
cmd.Flags().Duration("tikv-batch-max-wait", tikvBatchMaxWait, "max wait time before flushing batched records regardless of max batch size")
cmd.Flags().String("tikv-database", tikvDatabase, "name of tikv database")
// index options
cmd.Flags().StringP("index-source", "s", "file", `index source: "file" or "kafka"`)
cmd.Flags().StringP("index-format", "o", "cdxj", `index format: "cdxj", "cdxpb", "toc", badger", "tikv"`)
cmd.Flags().StringSlice("index-include", nil, "only include files matching these regular expressions")
cmd.Flags().StringSlice("index-exclude", nil, "exclude files matching these regular expressions")
cmd.Flags().Int("index-workers", 8, "number of index workers")

// auto indexer options
cmd.Flags().StringSliceP("file-paths", "f", []string{"./testdata"}, "directories to search for warc files in")
cmd.Flags().Int("file-max-depth", 4, "maximum directory recursion")

// kafka indexer options
cmd.Flags().StringSlice("kafka-brokers", nil, "the list of broker addresses used to connect to the kafka cluster")
cmd.Flags().String("kafka-group-id", "", "optional consumer group id")
cmd.Flags().String("kafka-topic", "", "the topic to read messages from")
cmd.Flags().Int("kafka-min-bytes", 0, "indicates to the broker the minimum batch size that the consumer will accept")
cmd.Flags().Int("kafka-max-bytes", 0, "indicates to the broker the maximum batch size that the consumer will accept")
cmd.Flags().Duration("kafka-max-wait", 0, "maximum amount of time to wait for new data to come when fetching batches of messages from kafka")

// toc indexer options
cmd.Flags().Uint("toc-bloom-capacity", uint(1000), "estimated bloom filter capacity")
cmd.Flags().Float64("toc-bloom-fp", 0.01, "estimated bloom filter false positive rate")

// badger options
cmd.Flags().String("badger-dir", "./warcdb", "path to index database")
cmd.Flags().String("badger-database", "", "name of badger database")
cmd.Flags().Int("badger-batch-max-size", 1000, "max transaction batch size in badger")
cmd.Flags().Duration("badger-batch-max-wait", 5*time.Second, "max wait time before flushing batched records")
cmd.Flags().String("badger-compression", "snappy", "compression algorithm")
cmd.Flags().Bool("badger-read-only", false, "run badger in read-only mode")

// tikv options
cmd.Flags().StringSlice("tikv-pd-addr", nil, "host:port of TiKV placement driver")
cmd.Flags().Int("tikv-batch-max-size", 1000, "max transaction batch size")
cmd.Flags().Duration("tikv-batch-max-wait", 5*time.Second, "max wait time before flushing batched records regardless of max batch size")
cmd.Flags().String("tikv-database", "", "name of tikv database")

return cmd
}

func indexCmd(_ *cobra.Command, args []string) error {
// collect paths from args or flag
var dirs []string
if len(args) > 0 {
dirs = append(dirs, args...)
} else {
dirs = viper.GetStringSlice("dirs")
}

var w index.Indexer
func indexCmd(_ *cobra.Command, _ []string) error {
var w index.RecordWriter

format := viper.GetString("format")
switch format {
indexFormat := viper.GetString("index-format")
switch indexFormat {
case "cdxj":
w = index.CdxJ{}
case "cdxpb":
Expand Down Expand Up @@ -135,14 +135,11 @@ func indexCmd(_ *cobra.Command, args []string) error {
BloomFilter: bloom.NewWithEstimates(viper.GetUint("toc-bloom-capacity"), viper.GetFloat64("toc-bloom-fp")),
}
default:
return fmt.Errorf("unsupported format %s", format)
return fmt.Errorf("unknown index format: %s", indexFormat)
}

indexWorker := index.NewWorker(w, viper.GetInt("workers"))
defer indexWorker.Close()

var includes []*regexp.Regexp
for _, r := range viper.GetStringSlice("include") {
for _, r := range viper.GetStringSlice("index-include") {
if re, err := regexp.Compile(r); err != nil {
return fmt.Errorf("%s: %w", r, err)
} else {
Expand All @@ -151,31 +148,58 @@ func indexCmd(_ *cobra.Command, args []string) error {
}

var excludes []*regexp.Regexp
for _, r := range viper.GetStringSlice("exclude") {
for _, r := range viper.GetStringSlice("index-exclude") {
if re, err := regexp.Compile(r); err != nil {
return fmt.Errorf("%s: %w", r, err)
} else {
excludes = append(excludes, re)
}
}

log.Info().Msg("Starting auto indexer")
indexer, err := index.NewAutoIndexer(indexWorker,
index.WithMaxDepth(viper.GetInt("max-depth")),
indexer := index.NewIndexer(w,
index.WithIncludes(includes...),
index.WithExcludes(excludes...),
)
if err != nil {
return err
}
defer indexer.Close()
queue := index.NewWorkQueue(indexer,
viper.GetInt("index-workers"),
)
defer queue.Close()

for _, dir := range dirs {
err := indexer.Index(dir)
if err != nil {
log.Warn().Msgf(`Error indexing "%s": %v`, dir, err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go func() {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
sig := <-sigs
log.Info().Msgf("Received %s signal, shutting down...", sig)
cancel()
}()

var runner index.Runner

indexSource := viper.GetString("index-source")
switch indexSource {
case "file":
runner = index.NewAutoIndexer(queue,
index.WithMaxDepth(viper.GetInt("file-max-depth")),
index.WithPaths(viper.GetStringSlice("file-paths")),
index.WithExcludeDirs(excludes...),
)
case "kafka":
runner = index.NewKafkaIndexer(queue,
index.WithBrokers(viper.GetStringSlice("kafka-brokers")),
index.WithGroupID(viper.GetString("kafka-group-id")),
index.WithTopic(viper.GetString("kafka-topic")),
index.WithMinBytes(viper.GetInt("kafka-min-bytes")),
index.WithMaxBytes(viper.GetInt("kafka-max-bytes")),
index.WithMaxWait(viper.GetDuration("kafka-max-wait")),
)
default:
return fmt.Errorf("unknown index source: %s", indexSource)
}

return nil
log.Info().Msg("Starting indexer")

return runner.Run(ctx)
}
29 changes: 0 additions & 29 deletions cmd/index/index_test.go

This file was deleted.

18 changes: 9 additions & 9 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@ import (

// NewCommand returns a new cobra.Command implementing the root command for warc
func NewCommand() *cobra.Command {
cobra.OnInitialize(func() { initConfig() })
cobra.OnInitialize(initConfig)

cmd := &cobra.Command{
Use: "gowarcserver",
Short: "gowarcserver is a tool for indexing and serving WARC files",
}

// Global flags
_ = cmd.PersistentFlags().StringP("config", "c", "", `path to config file, default paths are "./config.yaml", "$HOME/.gowarcserver/config.yaml" or "/etc/gowarcserver/config.yaml"`)
_ = cmd.PersistentFlags().String("config", "", `path to config file, default paths are "./config.yaml", "$HOME/.gowarcserver/config.yaml" or "/etc/gowarcserver/config.yaml"`)
_ = cmd.PersistentFlags().StringP("log-level", "l", "info", `set log level, available levels are "panic", "fatal", "error", "warn", "info", "debug" and "trace"`)
_ = cmd.PersistentFlags().String("log-formatter", "logfmt", "log formatter, available values are logfmt and json")
_ = cmd.PersistentFlags().Bool("log-method", false, "log method caller")
Expand Down Expand Up @@ -76,14 +76,14 @@ func initConfig() {
viper.AddConfigPath("/etc/gowarcserver/") // global configuration directory
}

if err := viper.ReadInConfig(); err != nil {
if errors.As(err, new(viper.ConfigFileNotFoundError)) {
return
}
defer func() {
logger.InitLog(viper.GetString("log-level"), viper.GetString("log-formatter"), viper.GetBool("log-method"))
log.Debug().Msgf("Using config file: %s", viper.ConfigFileUsed())
}()

err := viper.ReadInConfig()
if err != nil && !errors.As(err, new(viper.ConfigFileNotFoundError)) {
_, _ = fmt.Fprintf(os.Stderr, "Failed to read config file: %v", err)
os.Exit(1)
}
logger.InitLog(viper.GetString("log-level"), viper.GetString("log-formatter"), viper.GetBool("log-method"))

log.Info().Msgf("Using config file: %s", viper.ConfigFileUsed())
}
16 changes: 16 additions & 0 deletions cmd/root_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Copyright 2022 National Library of Norway.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cmd

import (
Expand Down
Loading

0 comments on commit f77ad37

Please sign in to comment.