Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: add support for sharding #46

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 31 additions & 3 deletions internal/sql/GitMaterial.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ const (
SOURCE_TYPE_WEBHOOK SourceType = "WEBHOOK"
)

//TODO: add support for submodule
// TODO: add support for submodule
type GitMaterial struct {
tableName struct{} `sql:"git_material"`
Id int `sql:"id,pk"`
Expand All @@ -53,10 +53,13 @@ type GitMaterial struct {
}

type MaterialRepository interface {
GetConnection() *pg.DB
FindById(id int) (*GitMaterial, error)
Update(material *GitMaterial) error
Save(material *GitMaterial) error
SaveWithTransaction(material *GitMaterial, tx *pg.Tx) error
FindActive() ([]*GitMaterial, error)
FindActiveForOrdinalIndex(ordinalIndex int) ([]*GitMaterial, error)
FindAll() ([]*GitMaterial, error)
FindAllActiveByUrls(urls []string) ([]*GitMaterial, error)
}
Expand All @@ -68,11 +71,20 @@ func NewMaterialRepositoryImpl(dbConnection *pg.DB) *MaterialRepositoryImpl {
return &MaterialRepositoryImpl{dbConnection: dbConnection}
}

func (repo MaterialRepositoryImpl) GetConnection() *pg.DB {
return repo.dbConnection
}

func (repo MaterialRepositoryImpl) Save(material *GitMaterial) error {
err := repo.dbConnection.Insert(material)
return err
}

func (repo MaterialRepositoryImpl) SaveWithTransaction(material *GitMaterial, tx *pg.Tx) error {
err := tx.Insert(material)
return err
}

func (repo MaterialRepositoryImpl) Update(material *GitMaterial) error {
_, err := repo.dbConnection.Model(material).WherePK().Update()
return err
Expand All @@ -81,12 +93,28 @@ func (repo MaterialRepositoryImpl) Update(material *GitMaterial) error {
func (repo MaterialRepositoryImpl) FindActive() ([]*GitMaterial, error) {
var materials []*GitMaterial
err := repo.dbConnection.Model(&materials).
Column("git_material.*", "GitProvider", ).
Column("git_material.*", "GitProvider").
Relation("CiPipelineMaterials", func(q *orm.Query) (*orm.Query, error) {
return q.Where("active IS TRUE"), nil
}).
Where("deleted =? ", false).
Where("checkout_status=? ", true).
Order("id ASC").
Select()
return materials, err
}

func (repo MaterialRepositoryImpl) FindActiveForOrdinalIndex(ordinalIndex int) ([]*GitMaterial, error) {
var materials []*GitMaterial
err := repo.dbConnection.Model(&materials).
Column("git_material.*", "GitProvider").
Join("INNER JOIN git_material_node_mapping gmnp ON gmnp.git_material_id = git_material.id").
Relation("CiPipelineMaterials", func(q *orm.Query) (*orm.Query, error) {
return q.Where("active IS TRUE"), nil
}).
Where("deleted =? ", false).
Where("checkout_status=? ", true).
Where("gmnp.ordinal_index = ?", ordinalIndex).
Order("id ASC").
Select()
return materials, err
Expand All @@ -111,7 +139,7 @@ func (repo MaterialRepositoryImpl) FindById(id int) (*GitMaterial, error) {
return &material, err
}

func (repo MaterialRepositoryImpl) FindAllActiveByUrls(urls[] string) ([]*GitMaterial, error) {
func (repo MaterialRepositoryImpl) FindAllActiveByUrls(urls []string) ([]*GitMaterial, error) {
var materials []*GitMaterial
err := repo.dbConnection.Model(&materials).
Relation("CiPipelineMaterials", func(q *orm.Query) (*orm.Query, error) {
Expand Down
62 changes: 62 additions & 0 deletions internal/sql/GitMaterialNodeMappingRepository.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright (c) 2020 Devtron Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package sql

import (
"github.com/go-pg/pg"
"go.uber.org/zap"
"time"
)

type GitMaterialNodeMapping struct {
tableName struct{} `sql:"git_material_node_mapping" pg:",discard_unknown_columns"`
Id int `sql:"id,pk"`
GitMaterialId int `sql:"git_material_id"`
OrdinalIndex int `sql:"ordinal_index" pg:",use_zero"`
CreatedBy int `sql:"created_by"`
CreatedOn time.Time `sql:"created_on"`
UpdatedBy int `sql:"updated_by"`
UpdatedOn time.Time `sql:"updated_on"`
}

type GitMaterialNodeMappingRepository interface {
GetConnection() *pg.DB
InsertWithTransaction(mapping *GitMaterialNodeMapping, tx *pg.Tx) error
}

type GitMaterialNodeMappingRepositoryImpl struct {
logger *zap.SugaredLogger
dbConnection *pg.DB
}

func NewGitMaterialNodeMappingRepositoryImpl(logger *zap.SugaredLogger,
dbConnection *pg.DB) *GitMaterialNodeMappingRepositoryImpl {
return &GitMaterialNodeMappingRepositoryImpl{
logger: logger,
dbConnection: dbConnection,
}
}

func (impl *GitMaterialNodeMappingRepositoryImpl) GetConnection() *pg.DB {
return impl.dbConnection
}

func (impl *GitMaterialNodeMappingRepositoryImpl) InsertWithTransaction(mapping *GitMaterialNodeMapping, tx *pg.Tx) error {

err := tx.Insert(mapping)
return err
}
51 changes: 49 additions & 2 deletions pkg/RepoManages.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,16 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/caarlos0/env"
"github.com/devtron-labs/git-sensor/internal"
"github.com/devtron-labs/git-sensor/internal/sql"
"github.com/devtron-labs/git-sensor/internal/util"
"github.com/devtron-labs/git-sensor/pkg/git"
_ "github.com/robfig/cron/v3"
"go.uber.org/zap"
"strconv"
"strings"
"time"
)

type RepoManager interface {
Expand Down Expand Up @@ -65,6 +69,7 @@ type RepoManagerImpl struct {
webhookEventDataMappingRepository sql.WebhookEventDataMappingRepository
webhookEventDataMappingFilterResultRepository sql.WebhookEventDataMappingFilterResultRepository
webhookEventBeanConverter git.WebhookEventBeanConverter
nodeMappingRepository sql.GitMaterialNodeMappingRepository
}

func NewRepoManagerImpl(
Expand All @@ -79,6 +84,7 @@ func NewRepoManagerImpl(
webhookEventDataMappingRepository sql.WebhookEventDataMappingRepository,
webhookEventDataMappingFilterResultRepository sql.WebhookEventDataMappingFilterResultRepository,
webhookEventBeanConverter git.WebhookEventBeanConverter,
nodeMappingRepository sql.GitMaterialNodeMappingRepository,
) *RepoManagerImpl {
return &RepoManagerImpl{
logger: logger,
Expand All @@ -93,6 +99,7 @@ func NewRepoManagerImpl(
webhookEventDataMappingRepository: webhookEventDataMappingRepository,
webhookEventDataMappingFilterResultRepository: webhookEventDataMappingFilterResultRepository,
webhookEventBeanConverter: webhookEventBeanConverter,
nodeMappingRepository: nodeMappingRepository,
}
}

Expand Down Expand Up @@ -238,7 +245,7 @@ func (impl RepoManagerImpl) SaveGitProvider(provider *sql.GitProvider) (*sql.Git
return provider, err
}

//handle update
// handle update
func (impl RepoManagerImpl) AddRepo(materials []*sql.GitMaterial) ([]*sql.GitMaterial, error) {
for _, material := range materials {
_, err := impl.addRepo(material)
Expand Down Expand Up @@ -307,11 +314,51 @@ func (impl RepoManagerImpl) checkoutUpdatedRepo(materialId int) error {
}

func (impl RepoManagerImpl) addRepo(material *sql.GitMaterial) (*sql.GitMaterial, error) {
err := impl.materialRepository.Save(material)

// Get pod ordeal index
nodeConfig := &git.NodeConfig{}
err := env.Parse(nodeConfig)
if err != nil || nodeConfig.Hostname == "" {
return nil, err
}
s := strings.Split(nodeConfig.Hostname, "-")
ordinalIndex, err := strconv.Atoi(s[len(s)-1])

// begin transaction
tx, err := impl.materialRepository.GetConnection().Begin()
if err != nil {
impl.logger.Errorw("error starting transaction",
"err", err)
return material, err
}
defer tx.Rollback()

// save material
err = impl.materialRepository.SaveWithTransaction(material, tx)
if err != nil {
impl.logger.Errorw("error in saving material ", "material", material, "err", err)
return material, err
}

// save mapping
err = impl.nodeMappingRepository.InsertWithTransaction(&sql.GitMaterialNodeMapping{
OrdinalIndex: ordinalIndex,
GitMaterialId: material.Id,
CreatedBy: 1,
CreatedOn: time.Now(),
UpdatedBy: 1,
UpdatedOn: time.Now(),
}, tx)
if err != nil {
impl.logger.Errorw("error while storing mapping",
"err", err)
return material, err
}

err = tx.Commit()
if err != nil {
return nil, err
}
return impl.checkoutRepo(material)
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/git/Bean.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ import (
"time"
)

type NodeConfig struct {
Hostname string `env:"HOSTNAME" envDefault:""`
}

type FetchScmChangesRequest struct {
PipelineMaterialId int `json:"pipelineMaterialId"`
From string `json:"from"`
Expand Down
16 changes: 15 additions & 1 deletion pkg/git/Watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"github.com/gammazero/workerpool"
"github.com/robfig/cron/v3"
"go.uber.org/zap"
"strconv"
"strings"
"time"
)

Expand Down Expand Up @@ -98,7 +100,19 @@ func (impl GitWatcherImpl) StopCron() {
}
func (impl GitWatcherImpl) Watch() {
impl.logger.Infow("starting git watch thread")
materials, err := impl.materialRepo.FindActive()

// Get pod ordeal index
nodeConfig := &NodeConfig{}
err := env.Parse(nodeConfig)
if err != nil || nodeConfig.Hostname == "" {
impl.logger.Errorw("error determining ordinal index",
"err", err)
return
}
s := strings.Split(nodeConfig.Hostname, "-")
ordinalIndex, err := strconv.Atoi(s[len(s)-1])

materials, err := impl.materialRepo.FindActiveForOrdinalIndex(ordinalIndex)
if err != nil {
impl.logger.Error("error in fetching watchlist", "err", err)
return
Expand Down
5 changes: 5 additions & 0 deletions scripts/sql/10_sharding.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- Git Sensor node mapping table
DROP TABLE IF EXISTS "public"."git_material_node_mapping" CASCADE;

---- DROP sequence
DROP SEQUENCE IF EXISTS public.id_seq_git_material_node_mapping;
13 changes: 13 additions & 0 deletions scripts/sql/10_sharding.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
-- Sequence and defined type
CREATE SEQUENCE IF NOT EXISTS id_seq_git_material_node_mapping;

-- Create GitSensorNode mapping table
CREATE TABLE git_material_node_mapping(
"id" INTEGER PRIMARY KEY DEFAULT nextval('id_seq_git_material_node_mapping'::regclass),
"git_material_id" INTEGER NOT NULL,
"ordinal_index" INTEGER NOT NULL DEFAULT 0,
"created_on" TIMESTAMPTZ,
"created_by" INTEGER,
"updated_on" TIMESTAMPTZ,
"updated_by" INTEGER
);
2 changes: 2 additions & 0 deletions wire.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ func InitializeApp() (*App, error) {
wire.Bind(new(git.WebhookEventParser), new(*git.WebhookEventParserImpl)),
git.NewWebhookHandlerImpl,
wire.Bind(new(git.WebhookHandler), new(*git.WebhookHandlerImpl)),
sql.NewGitMaterialNodeMappingRepositoryImpl,
wire.Bind(new(sql.GitMaterialNodeMappingRepository), new(*sql.GitMaterialNodeMappingRepositoryImpl)),
)
return &App{}, nil
}
3 changes: 2 additions & 1 deletion wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.