Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 47f25999-dd5e-4636-8c39-e7cea2453331
dockerImageTag: 2.8.13
dockerImageTag: 2.9.0-rc.1
dockerRepository: airbyte/source-bing-ads
documentationUrl: https://docs.airbyte.com/integrations/sources/bing-ads
erdUrl: https://dbdocs.io/airbyteio/source-bing-ads?view=relationships
Expand All @@ -36,6 +36,8 @@ data:
enabled: true
releaseStage: generally_available
releases:
rolloutConfiguration:
enableProgressiveRollout: true
breakingChanges:
1.0.0:
message: Version 1.0.0 removes the primary keys from the geographic performance report streams. This will prevent the connector from losing data in the incremental append+dedup sync mode because of deduplication and incorrect primary keys. A data reset and schema refresh of all the affected streams is required for the changes to take effect.
Expand Down
678 changes: 410 additions & 268 deletions airbyte-integrations/connectors/source-bing-ads/poetry.lock

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "2.8.13"
version = "2.9.0-rc.1"
name = "source-bing-ads"
description = "Source implementation for Bing Ads."
authors = [ "Airbyte <contact@airbyte.io>",]
Expand All @@ -19,8 +19,9 @@ include = "source_bing_ads"
python = "^3.10,<3.12"
bingads = "==13.0.18.1"
urllib3 = "==1.26.18"
airbyte-cdk = "^5"
airbyte-cdk = "^6"
cached-property = "==1.5.2"
pendulum = "<3.0.0"

[tool.poetry.scripts]
source-bing-ads = "source_bing_ads.run:run"
Expand All @@ -29,7 +30,7 @@ source-bing-ads = "source_bing_ads.run:run"
freezegun = "^1.4.0"
pytest-mock = "^3.6.1"
pytest = "^8.0.0"
requests-mock = "^1.9.3"
requests-mock = "^1.12.1"


[tool.poe]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,20 @@
from numpy import nan

from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams import IncrementalMixin
from airbyte_cdk.sources.streams import CheckpointMixin
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
from source_bing_ads.base_streams import Accounts, BingAdsBaseStream
from source_bing_ads.utils import transform_bulk_datetime_format_to_rfc_3339


class BingAdsBulkStream(BingAdsBaseStream, IncrementalMixin, ABC):
class BingAdsBulkStream(BingAdsBaseStream, CheckpointMixin, ABC):
transformer: TypeTransformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization | TransformConfig.CustomSchemaNormalization)
cursor_field = "Modified Time"
primary_key = "Id"
_state = {}

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._state = {}

@staticmethod
@transformer.registerCustomTransform
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
version: 6.7.0

type: DeclarativeSource

check:
type: CheckStream
stream_names:
- accounts

definitions:
authenticator:
type: OAuthAuthenticator
refresh_request_body:
environment: "production"
oauth_scope: "msads.manage"
scope: "https://ads.microsoft.com/msads.manage offline_access"
tenant: "{{ config['tenant_id'] }}"
token_refresh_endpoint: 'https://login.microsoftonline.com/{{ config["tenant_id"] }}/oauth2/v2.0/token'
grant_type: refresh_token
client_id: '{{ config["client_id"] }}'
client_secret: '{{ config["client_secret"] }}'
refresh_token: '{{ config["refresh_token"] }}'
users_stream:
type: DeclarativeStream
name: users
primary_key: Id
schema_loader:
type: InlineSchemaLoader
schema: # this does not matter as we don't expose the stream as public
type: object
$schema: http://json-schema.org/draft-07/schema#
additionalProperties: true
properties: {}
retriever:
type: SimpleRetriever
requester:
type: HttpRequester
url_base: https://clientcenter.api.bingads.microsoft.com/CustomerManagement/v13/User/Query
http_method: POST
request_headers:
Content-Type: application/json
DeveloperToken: "{{ config['developer_token'] }}"
request_body_data: '{"UserId": null}'
authenticator: "#/definitions/authenticator"
record_selector:
type: RecordSelector
extractor:
type: DpathExtractor
field_path: ["User"]
schema_normalization: Default

accounts_stream:
type: DeclarativeStream
name: accounts
primary_key: Id
schema_loader:
type: JsonFileSchemaLoader
file_path: "./source_bing_ads/schemas/accounts.json"
retriever:
type: SimpleRetriever
requester:
type: HttpRequester
url_base: https://clientcenter.api.bingads.microsoft.com/CustomerManagement/v13/Accounts/Search
http_method: POST
request_headers:
Content-Type: application/json
DeveloperToken: "{{ config['developer_token'] }}"
request_body_json:
PageInfo:
Index: "{{ next_page_token.next_page_token }}"
Size: 1000
Predicates:
- Field: UserId
Operator: Equals
Value: "'{{ stream_partition['user_id'] }}'"
ReturnAdditionalFields: TaxCertificate,AccountMode
authenticator: "#/definitions/authenticator"
paginator:
type: DefaultPaginator
pagination_strategy:
type: PageIncrement
inject_on_first_request: true
page_size: 1000
partition_router:
type: SubstreamPartitionRouter
parent_stream_configs:
- type: ParentStreamConfig
parent_key: Id
partition_field: user_id
stream:
$ref: "#/definitions/users_stream"
record_selector:
type: RecordSelector
extractor:
type: DpathExtractor
field_path: ["Accounts"]
schema_normalization: Default
transformations:
- type: AddFields
fields:
- path:
- _airbyte_parent_id # used for campaigns_stream
value: "{{ { 'account_id': record['Id'], 'customer_id': record['ParentCustomerId'] } }}"

campaigns_stream:
type: DeclarativeStream
name: campaigns
primary_key: Id
schema_loader:
type: JsonFileSchemaLoader
file_path: "./source_bing_ads/schemas/campaigns.json"
retriever:
type: SimpleRetriever
requester:
type: HttpRequester
url_base: https://campaign.api.bingads.microsoft.com/CampaignManagement/v13/Campaigns/QueryByAccountId
http_method: POST
request_headers:
Content-Type: application/json
DeveloperToken: "{{ config['developer_token'] }}"
CustomerId: "'{{ stream_partition['customer_id'] }}'"
CustomerAccountId: "'{{ stream_partition['account_id'] }}'"
request_body_json:
AccountId: "'{{ stream_partition['account_id'] }}'"
CampaignType: Audience,DynamicSearchAds,Search,Shopping,PerformanceMax
ReturnAdditionalFields: AdScheduleUseSearcherTimeZone,BidStrategyId,CpvCpmBiddingScheme,DynamicDescriptionSetting,DynamicFeedSetting,MaxConversionValueBiddingScheme,MultimediaAdsBidAdjustment,TargetImpressionShareBiddingScheme,TargetSetting,VerifiedTrackingSetting
authenticator: "#/definitions/authenticator"
partition_router:
type: SubstreamPartitionRouter
parent_stream_configs:
- type: ParentStreamConfig
parent_key: Id
partition_field: account_id
stream:
$ref: "#/definitions/accounts_stream"
record_selector:
type: RecordSelector
extractor:
type: DpathExtractor
field_path: ["Campaigns"]
schema_normalization: Default
transformations: # FIXME missing transformations for Settings and Languages
- type: AddFields
fields:
- path:
- AccountId
value: "{{ stream_slice.account_id }}"
- type: AddFields
fields:
- path:
- CustomerId
value: "{{ stream_slice.customer_id }}"

streams:
- $ref: "#/definitions/users_stream"
- $ref: "#/definitions/accounts_stream"
- $ref: "#/definitions/campaigns_stream"

concurrency_level:
type: ConcurrencyLevel
default_concurrency: 2
max_concurrency: 10
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,52 @@


import sys
import traceback
from datetime import datetime
from typing import List

from airbyte_cdk.entrypoint import launch
from orjson import orjson

from airbyte_cdk.entrypoint import AirbyteEntrypoint, launch, logger
from airbyte_cdk.exception_handler import init_uncaught_exception_handler
from airbyte_cdk.models import AirbyteErrorTraceMessage, AirbyteMessage, AirbyteMessageSerializer, AirbyteTraceMessage, TraceType, Type
from source_bing_ads import SourceBingAds


def run():
source = SourceBingAds()
launch(source, sys.argv[1:])
def _get_source(args: List[str]):
catalog_path = AirbyteEntrypoint.extract_catalog(args)
config_path = AirbyteEntrypoint.extract_config(args)
state_path = AirbyteEntrypoint.extract_state(args)
try:
return SourceBingAds(
SourceBingAds.read_catalog(catalog_path) if catalog_path else None,
SourceBingAds.read_config(config_path) if config_path else None,
SourceBingAds.read_state(state_path) if state_path else None,
)
except Exception as error:
print(
orjson.dumps(
AirbyteMessageSerializer.dump(
AirbyteMessage(
type=Type.TRACE,
trace=AirbyteTraceMessage(
type=TraceType.ERROR,
emitted_at=int(datetime.now().timestamp() * 1000),
error=AirbyteErrorTraceMessage(
message=f"Error starting the sync. This could be due to an invalid configuration or catalog. Please contact Support for assistance. Error: {error}",
stack_trace=traceback.format_exc(),
),
),
)
)
).decode()
)
return None


def run() -> None:
init_uncaught_exception_handler(logger)
_args = sys.argv[1:]
source = _get_source(_args)
if source:
launch(source, _args)
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"properties": {
"Id": {
"description": "ID of the account",
"type": ["null", "number"]
"type": ["null", "integer"]
},
"AccountFinancialStatus": {
"description": "The financial status of the account",
Expand Down Expand Up @@ -36,15 +36,18 @@
},
"LinkedAgencies": {
"description": "The agencies linked to the account for management purposes.",
"type": ["null", "object"],
"properties": {
"Id": {
"description": "ID of the linked agency",
"type": ["null", "integer"]
},
"Name": {
"description": "Name of the linked agency",
"type": ["null", "string"]
"type": ["null", "array"],
"items": {
"type": ["null", "object"],
"properties": {
"Id": {
"description": "ID of the linked agency",
"type": ["null", "integer"]
},
"Name": {
"description": "Name of the linked agency",
"type": ["null", "string"]
}
}
}
},
Expand Down Expand Up @@ -74,7 +77,7 @@
},
"Id": {
"description": "ID of the business address",
"type": ["null", "number"]
"type": ["null", "integer"]
},
"Line1": {
"description": "Address line 1",
Expand Down Expand Up @@ -112,19 +115,19 @@
},
"BackUpPaymentInstrumentId": {
"description": "ID of the backup payment instrument",
"type": ["null", "number"]
"type": ["null", "integer"]
},
"BillingThresholdAmount": {
"description": "The threshold amount for billing",
"type": ["null", "number"]
},
"BillToCustomerId": {
"description": "Customer ID for billing",
"type": ["null", "number"]
"type": ["null", "integer"]
},
"LastModifiedByUserId": {
"description": "ID of the user who last modified the account",
"type": ["null", "number"]
"type": ["null", "integer"]
},
"LastModifiedTime": {
"description": "The date and time of the last modification",
Expand All @@ -142,27 +145,27 @@
},
"ParentCustomerId": {
"description": "ID of the parent customer",
"type": ["null", "number"]
"type": ["null", "integer"]
},
"PauseReason": {
"description": "Reason for pausing the account",
"type": ["null", "number"]
"type": ["null", "integer"]
},
"PaymentMethodId": {
"description": "ID of the payment method",
"type": ["null", "number"]
"type": ["null", "integer"]
},
"PrimaryUserId": {
"description": "ID of the primary user",
"type": ["null", "number"]
"type": ["null", "integer"]
},
"SalesHouseCustomerId": {
"description": "Customer ID for sales house",
"type": ["null", "number"]
"type": ["null", "integer"]
},
"SoldToPaymentInstrumentId": {
"description": "ID of the payment instrument for sales",
"type": ["null", "number"]
"type": ["null", "integer"]
},
"TimeStamp": {
"description": "Timestamp of the account",
Expand Down
Loading
Loading