Skip to content

Commit

Permalink
use sqlalchemy generated insert statement
Browse files Browse the repository at this point in the history
  • Loading branch information
mrhorvath committed Mar 22, 2022
1 parent 8a329d6 commit bf4a762
Showing 1 changed file with 36 additions and 1 deletion.
37 changes: 36 additions & 1 deletion target_oracle/sinks.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""Oracle target sink class, which handles writing streams."""

from typing import Any, Dict, Iterable, Optional

from singer_sdk.sinks import SQLSink
import sqlalchemy

Expand Down Expand Up @@ -31,4 +33,37 @@ def activate_version(self, new_version: int) -> None:

# temporary fix, parents don't seem to handle activate messages where table does not exist
if self.connector.table_exists(full_table_name=self.full_table_name):
super().activate_version(new_version)
super().activate_version(new_version)

def bulk_insert_records(
self,
full_table_name: str,
schema: dict,
records: Iterable[Dict[str, Any]],
) -> Optional[int]:
"""Bulk insert records to an existing destination table.
The default implementation uses a generic SQLAlchemy bulk insert operation.
This method may optionally be overridden by developers in order to provide
faster, native bulk uploads.
Args:
full_table_name: the target table name.
schema: the JSON schema for the new table, to be used when inferring column
names.
records: the input records.
Returns:
True if table exists, False if not, None if unsure or undetectable.
"""
table = self.connector.get_table(full_table_name)

self.connection.execute(
table.insert(),
records,
)

if isinstance(records, list):
return len(records) # If list, we can quickly return record count.

return None # Unknown record count.

0 comments on commit bf4a762

Please sign in to comment.