Skip to content

Commit

Permalink
Batch size update limits (#916)
Browse files Browse the repository at this point in the history
* adds batch size limits

* bump
  • Loading branch information
ieaves committed Apr 30, 2024
1 parent 6d07538 commit 9f2e423
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 18 deletions.
1 change: 0 additions & 1 deletion grai-server/app/connections/adapters/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ def get_nodes_and_edges(self):
def run_update(self):
nodes, edges = self.integration.get_nodes_and_edges()
capture_quarantined_errors(self.integration, self.run)

update(self.run.workspace, self.run.source, nodes)
update(self.run.workspace, self.run.source, edges)

Expand Down
36 changes: 29 additions & 7 deletions grai-server/app/connections/task_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@
from lineage.models import Node as NodeModel
from lineage.models import Source
from workspaces.models import Workspace

from django.db import transaction
from .adapters.schemas import model_to_schema, schema_to_model
from itertools import islice
from pympler import asizeof


class NameNamespace(Protocol):
Expand Down Expand Up @@ -279,6 +281,21 @@ def process_updates(
return new, old, updated


def create_batches(data: list, threshold_size=500 * 1024 * 1024) -> list:
batch = []
current_batch_size = 0
for item in data:
item_size = asizeof.asizeof(item)
if current_batch_size + item_size > threshold_size and batch:
yield batch
batch = []
current_batch_size = 0
batch.append(item)
current_batch_size += item_size
if batch:
yield batch


def update(
workspace: Workspace,
source: Source,
Expand All @@ -290,16 +307,21 @@ def update(

source, _ = Source.objects.get_or_create(id=source.id, name=source.name, workspace=workspace)

item_types = items[0].type
Model = NodeModel if item_types in ["Node", "SourceNode"] else EdgeModel
relationship = source.nodes if item_types in ["Node", "SourceNode"] else source.edges
is_node = items[0].type in ["Node", "SourceNode"]
Model = NodeModel if is_node else EdgeModel
relationship = source.nodes if is_node else source.edges

new_items, deactivated_items, updated_items = process_updates(workspace, source, items, active_items)

Model.objects.bulk_create(new_items)
Model.objects.bulk_update(updated_items, ["metadata"])
# relationship creationcan be improved with a switch to a bulk_create on the through entity
# https://stackoverflow.com/questions/68422898/efficiently-bulk-updating-many-manytomany-fields
for batch in create_batches(new_items):
Model.objects.bulk_create(batch)
for batch in create_batches(new_items):
Model.objects.bulk_update(batch, ["metadata"])

relationship.add(*new_items, *updated_items)
with transaction.atomic():
relationship.add(*new_items, *updated_items)

if len(deactivated_items) > 0:
relationship.remove(*deactivated_items)
Expand Down
13 changes: 12 additions & 1 deletion grai-server/app/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion grai-server/app/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "the_guide"
version = "0.1.60"
version = "0.1.69"
description = ""
authors = ["Grai <[email protected]>"]
license = "Elastic-2.0"
Expand Down Expand Up @@ -78,6 +78,7 @@ pyjwt = "^2.8.0"
django-csp = "^3.7"
multimethod = "^1.4.0, !=1.11.0"
grai-source-cube = "^0.0.2"
pympler = "^1.0.1"


[tool.poetry.group.dev.dependencies]
Expand Down
2 changes: 1 addition & 1 deletion grai-server/app/the_guide/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__version__ = "0.1.60"
__version__ = "0.1.69"
__all__ = ("celery_app", "__version__")
13 changes: 6 additions & 7 deletions grai-server/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ services:
build:
context: ./app
target: grai-server
volumes:
- ./app/:/usr/src/app/
# volumes:
# - ./app/:/usr/src/app/
ports:
- 8000:8000
environment:
Expand All @@ -33,8 +33,8 @@ services:
build:
context: ./app
target: grai-worker
volumes:
- ./app/:/usr/src/app/
# volumes:
# - ./app/:/usr/src/app/
environment:
- DB_HOST=db
- REDIS_HOST=redis
Expand All @@ -51,8 +51,8 @@ services:
build:
context: ./app
target: grai-beat-worker
volumes:
- ./app/:/usr/src/app/
# volumes:
# - ./app/:/usr/src/app/
environment:
- DB_HOST=db
- REDIS_HOST=redis
Expand Down Expand Up @@ -114,7 +114,6 @@ services:
- 5557:5555
environment:
- CELERY_BROKER_URL=redis://redis:6379/0

depends_on:
- redis
- db
Expand Down

0 comments on commit 9f2e423

Please sign in to comment.