diff --git a/gcloud/bigquery/client.py b/gcloud/bigquery/client.py index 5987796a676e..282d26ab76ba 100644 --- a/gcloud/bigquery/client.py +++ b/gcloud/bigquery/client.py @@ -102,6 +102,84 @@ def dataset(self, dataset_name): """ return Dataset(dataset_name, client=self) + def _job_from_resource(self, resource): + """Detect correct job type from resource and instantiate. + + Helper for :meth:`list_jobs`. + + :type resource: dict + :param resource: one job resource from API response + + :rtype; One of: + :class:`gcloud.bigquery.job.LoadTableFromStorageJob`, + :class:`gcloud.bigquery.job.CopyJob, + :class:`gcloud.bigquery.job.ExtractTableToStorageJob, + :class:`gcloud.bigquery.job.QueryJob, + :class:`gcloud.bigquery.job.RunSyncQueryJob + :returns: the job instance, constructed via the resource + """ + config = resource['configuration'] + if 'load' in config: + return LoadTableFromStorageJob.from_api_repr(resource, self) + elif 'copy' in config: + return CopyJob.from_api_repr(resource, self) + elif 'extract' in config: + return ExtractTableToStorageJob.from_api_repr(resource, self) + elif 'query' in config: + return QueryJob.from_api_repr(resource, self) + raise ValueError('Cannot parse job resource') + + def list_jobs(self, max_results=None, page_token=None, all_users=None, + state_filter=None): + """List jobs for the project associated with this client. + + See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs/list + + :type max_results: int + :param max_results: maximum number of jobs to return, If not + passed, defaults to a value set by the API. + + :type page_token: string + :param page_token: opaque marker for the next "page" of jobs. If + not passed, the API will return the first page of + jobs. + + :type all_users: boolean + :param all_users: if true, include jobs owned by all users in the + project. + + :type state_filter: string + :param state_filter: if passed, include only jobs matching the given + state. One of "done", "pending", or "running". + + :rtype: tuple, (list, str) + :returns: list of job instances, plus a "next page token" string: + if the token is not None, indicates that more jobs can be + retrieved with another call, passing that value as + ``page_token``). + """ + params = {} + + if max_results is not None: + params['maxResults'] = max_results + + if page_token is not None: + params['pageToken'] = page_token + + if all_users is not None: + params['allUsers'] = all_users + + if state_filter is not None: + params['stateFilter'] = state_filter + + path = '/projects/%s/jobs' % (self.project,) + resp = self.connection.api_request(method='GET', path=path, + query_params=params) + jobs = [self._job_from_resource(resource) + for resource in resp['jobs']] + return jobs, resp.get('nextPageToken') + def load_table_from_storage(self, job_name, destination, *source_uris): """Construct a job for loading data into a table from CloudStorage. diff --git a/gcloud/bigquery/job.py b/gcloud/bigquery/job.py index aff85f9e7f4b..d0feb2dd489d 100644 --- a/gcloud/bigquery/job.py +++ b/gcloud/bigquery/job.py @@ -269,6 +269,31 @@ def _set_properties(self, api_response): self._properties.clear() self._properties.update(cleaned) + @classmethod + def _get_resource_config(cls, resource): + """Helper for :meth:`from_api_repr` + + :type resource: dict + :param resource: resource for the job + + :rtype: dict + :returns: tuple (string, dict), where the first element is the + job name and the second contains job-specific configuration. + :raises: :class:`KeyError` if the resource has no identifier, or + is missing the appropriate configuration. + """ + if ('jobReference' not in resource or + 'jobId' not in resource['jobReference']): + raise KeyError('Resource lacks required identity information: ' + '["jobReference"]["jobId"]') + name = resource['jobReference']['jobId'] + if ('configuration' not in resource or + cls._CONFIG_KEY not in resource['configuration']): + raise KeyError('Resource lacks required configuration: ' + '["configuration"]["%s"]' % cls._CONFIG_KEY) + config = resource['configuration'][cls._CONFIG_KEY] + return name, config + def begin(self, client=None): """API call: begin the job via a POST request @@ -378,6 +403,7 @@ class LoadTableFromStorageJob(_AsyncJob): """ _schema = None + _CONFIG_KEY = 'load' def __init__(self, name, destination, source_uris, client, schema=()): super(LoadTableFromStorageJob, self).__init__(name, client) @@ -542,7 +568,7 @@ def _build_resource(self): 'jobId': self.name, }, 'configuration': { - 'load': { + self._CONFIG_KEY: { 'sourceUris': self.source_uris, 'destinationTable': { 'projectId': self.destination.project, @@ -552,7 +578,7 @@ def _build_resource(self): }, }, } - configuration = resource['configuration']['load'] + configuration = resource['configuration'][self._CONFIG_KEY] self._populate_config_resource(configuration) if len(self.schema) > 0: @@ -566,6 +592,34 @@ def _scrub_local_properties(self, cleaned): schema = cleaned.pop('schema', {'fields': ()}) self.schema = _parse_schema_resource(schema) + @classmethod + def from_api_repr(cls, resource, client): + """Factory: construct a job given its API representation + + .. note: + + This method assumes that the project found in the resource matches + the client's project. + + :type resource: dict + :param resource: dataset job representation returned from the API + + :type client: :class:`gcloud.bigquery.client.Client` + :param client: Client which holds credentials and project + configuration for the dataset. + + :rtype: :class:`gcloud.bigquery.job.LoadTableFromStorageJob` + :returns: Job parsed from ``resource``. + """ + name, config = cls._get_resource_config(resource) + dest_config = config['destinationTable'] + dataset = Dataset(dest_config['datasetId'], client) + destination = Table(dest_config['tableId'], dataset) + source_urls = config['sourceUris'] + job = cls(name, destination, source_urls, client=client) + job._set_properties(resource) + return job + class _CopyConfiguration(object): """User-settable configuration options for copy jobs. @@ -592,6 +646,9 @@ class CopyJob(_AsyncJob): :param client: A client which holds credentials and project configuration for the dataset (which requires a project). """ + + _CONFIG_KEY = 'copy' + def __init__(self, name, destination, sources, client): super(CopyJob, self).__init__(name, client) self.destination = destination @@ -630,7 +687,7 @@ def _build_resource(self): 'jobId': self.name, }, 'configuration': { - 'copy': { + self._CONFIG_KEY: { 'sourceTables': source_refs, 'destinationTable': { 'projectId': self.destination.project, @@ -640,11 +697,42 @@ def _build_resource(self): }, }, } - configuration = resource['configuration']['copy'] + configuration = resource['configuration'][self._CONFIG_KEY] self._populate_config_resource(configuration) return resource + @classmethod + def from_api_repr(cls, resource, client): + """Factory: construct a job given its API representation + + .. note: + + This method assumes that the project found in the resource matches + the client's project. + + :type resource: dict + :param resource: dataset job representation returned from the API + + :type client: :class:`gcloud.bigquery.client.Client` + :param client: Client which holds credentials and project + configuration for the dataset. + + :rtype: :class:`gcloud.bigquery.job.CopyJob` + :returns: Job parsed from ``resource``. + """ + name, config = cls._get_resource_config(resource) + dest_config = config['destinationTable'] + dataset = Dataset(dest_config['datasetId'], client) + destination = Table(dest_config['tableId'], dataset) + sources = [] + for source_config in config['sourceTables']: + dataset = Dataset(source_config['datasetId'], client) + sources.append(Table(source_config['tableId'], dataset)) + job = cls(name, destination, sources, client=client) + job._set_properties(resource) + return job + class _ExtractConfiguration(object): """User-settable configuration options for extract jobs. @@ -675,6 +763,8 @@ class ExtractTableToStorageJob(_AsyncJob): :param client: A client which holds credentials and project configuration for the dataset (which requires a project). """ + _CONFIG_KEY = 'extract' + def __init__(self, name, source, destination_uris, client): super(ExtractTableToStorageJob, self).__init__(name, client) self.source = source @@ -727,17 +817,45 @@ def _build_resource(self): 'jobId': self.name, }, 'configuration': { - 'extract': { + self._CONFIG_KEY: { 'sourceTable': source_ref, 'destinationUris': self.destination_uris, }, }, } - configuration = resource['configuration']['extract'] + configuration = resource['configuration'][self._CONFIG_KEY] self._populate_config_resource(configuration) return resource + @classmethod + def from_api_repr(cls, resource, client): + """Factory: construct a job given its API representation + + .. note: + + This method assumes that the project found in the resource matches + the client's project. + + :type resource: dict + :param resource: dataset job representation returned from the API + + :type client: :class:`gcloud.bigquery.client.Client` + :param client: Client which holds credentials and project + configuration for the dataset. + + :rtype: :class:`gcloud.bigquery.job.ExtractTableToStorageJob` + :returns: Job parsed from ``resource``. + """ + name, config = cls._get_resource_config(resource) + source_config = config['sourceTable'] + dataset = Dataset(source_config['datasetId'], client) + source = Table(source_config['tableId'], dataset) + destination_uris = config['destinationUris'] + job = cls(name, source, destination_uris, client=client) + job._set_properties(resource) + return job + class _AsyncQueryConfiguration(object): """User-settable configuration options for asynchronous query jobs. @@ -747,7 +865,7 @@ class _AsyncQueryConfiguration(object): _allow_large_results = None _create_disposition = None _default_dataset = None - _destination_table = None + _destination = None _flatten_results = None _priority = None _use_query_cache = None @@ -767,6 +885,8 @@ class QueryJob(_AsyncJob): :param client: A client which holds credentials and project configuration for the dataset (which requires a project). """ + _CONFIG_KEY = 'query' + def __init__(self, name, query, client): super(QueryJob, self).__init__(name, client) self.query = query @@ -787,7 +907,7 @@ def __init__(self, name, query, client): https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.defaultDataset """ - destination_table = _TypedProperty('destination_table', Table) + destination = _TypedProperty('destination', Table) """See: https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.destinationTable """ @@ -813,11 +933,11 @@ def __init__(self, name, query, client): """ def _destination_table_resource(self): - if self.destination_table is not None: + if self.destination is not None: return { - 'projectId': self.destination_table.project, - 'datasetId': self.destination_table.dataset_name, - 'tableId': self.destination_table.name, + 'projectId': self.destination.project, + 'datasetId': self.destination.dataset_name, + 'tableId': self.destination.name, } def _populate_config_resource(self, configuration): @@ -831,7 +951,7 @@ def _populate_config_resource(self, configuration): 'projectId': self.default_dataset.project, 'datasetId': self.default_dataset.name, } - if self.destination_table is not None: + if self.destination is not None: table_res = self._destination_table_resource() configuration['destinationTable'] = table_res if self.flatten_results is not None: @@ -852,27 +972,52 @@ def _build_resource(self): 'jobId': self.name, }, 'configuration': { - 'query': { + self._CONFIG_KEY: { 'query': self.query, }, }, } - configuration = resource['configuration']['query'] + configuration = resource['configuration'][self._CONFIG_KEY] self._populate_config_resource(configuration) return resource def _scrub_local_properties(self, cleaned): - """Helper: handle subclass properties in cleaned.""" + """Helper: handle subclass properties in cleaned. + + .. note: + + This method assumes that the project found in the resource matches + the client's project. + """ configuration = cleaned['configuration']['query'] dest_remote = configuration.get('destinationTable') if dest_remote is None: - if self.destination_table is not None: - del self.destination_table + if self.destination is not None: + del self.destination else: dest_local = self._destination_table_resource() if dest_remote != dest_local: - assert dest_remote['projectId'] == self.project dataset = self._client.dataset(dest_remote['datasetId']) - self.destination_table = dataset.table(dest_remote['tableId']) + self.destination = dataset.table(dest_remote['tableId']) + + @classmethod + def from_api_repr(cls, resource, client): + """Factory: construct a job given its API representation + + :type resource: dict + :param resource: dataset job representation returned from the API + + :type client: :class:`gcloud.bigquery.client.Client` + :param client: Client which holds credentials and project + configuration for the dataset. + + :rtype: :class:`gcloud.bigquery.job.RunAsyncQueryJob` + :returns: Job parsed from ``resource``. + """ + name, config = cls._get_resource_config(resource) + query = config['query'] + job = cls(name, query, client=client) + job._set_properties(resource) + return job diff --git a/gcloud/bigquery/test_client.py b/gcloud/bigquery/test_client.py index d8d069c0f543..017e780292a6 100644 --- a/gcloud/bigquery/test_client.py +++ b/gcloud/bigquery/test_client.py @@ -128,6 +128,165 @@ def test_dataset(self): self.assertEqual(dataset.name, DATASET) self.assertTrue(dataset._client is client) + def test__job_from_resource_unknown_type(self): + PROJECT = 'PROJECT' + creds = _Credentials() + client = self._makeOne(PROJECT, creds) + with self.assertRaises(ValueError): + client._job_from_resource({'configuration': {'nonesuch': {}}}) + + def test_list_jobs_defaults(self): + from gcloud.bigquery.job import LoadTableFromStorageJob + from gcloud.bigquery.job import CopyJob + from gcloud.bigquery.job import ExtractTableToStorageJob + from gcloud.bigquery.job import QueryJob + PROJECT = 'PROJECT' + DATASET = 'test_dataset' + SOURCE_TABLE = 'source_table' + DESTINATION_TABLE = 'destination_table' + QUERY_DESTINATION_TABLE = 'query_destination_table' + SOURCE_URI = 'gs://test_bucket/src_object*' + DESTINATION_URI = 'gs://test_bucket/dst_object*' + JOB_TYPES = { + 'load_job': LoadTableFromStorageJob, + 'copy_job': CopyJob, + 'extract_job': ExtractTableToStorageJob, + 'query_job': QueryJob, + } + PATH = 'projects/%s/jobs' % PROJECT + TOKEN = 'TOKEN' + QUERY = 'SELECT * from test_dataset:test_table' + ASYNC_QUERY_DATA = { + 'id': '%s:%s' % (PROJECT, 'query_job'), + 'jobReference': { + 'projectId': PROJECT, + 'jobId': 'query_job', + }, + 'state': 'DONE', + 'configuration': { + 'query': { + 'query': QUERY, + 'destinationTable': { + 'projectId': PROJECT, + 'datasetId': DATASET, + 'tableId': QUERY_DESTINATION_TABLE, + }, + 'createDisposition': 'CREATE_IF_NEEDED', + 'writeDisposition': 'WRITE_TRUNCATE', + } + }, + } + EXTRACT_DATA = { + 'id': '%s:%s' % (PROJECT, 'extract_job'), + 'jobReference': { + 'projectId': PROJECT, + 'jobId': 'extract_job', + }, + 'state': 'DONE', + 'configuration': { + 'extract': { + 'sourceTable': { + 'projectId': PROJECT, + 'datasetId': DATASET, + 'tableId': SOURCE_TABLE, + }, + 'destinationUris': [DESTINATION_URI], + } + }, + } + COPY_DATA = { + 'id': '%s:%s' % (PROJECT, 'copy_job'), + 'jobReference': { + 'projectId': PROJECT, + 'jobId': 'copy_job', + }, + 'state': 'DONE', + 'configuration': { + 'copy': { + 'sourceTables': [{ + 'projectId': PROJECT, + 'datasetId': DATASET, + 'tableId': SOURCE_TABLE, + }], + 'destinationTable': { + 'projectId': PROJECT, + 'datasetId': DATASET, + 'tableId': DESTINATION_TABLE, + }, + } + }, + } + LOAD_DATA = { + 'id': '%s:%s' % (PROJECT, 'load_job'), + 'jobReference': { + 'projectId': PROJECT, + 'jobId': 'load_job', + }, + 'state': 'DONE', + 'configuration': { + 'load': { + 'destinationTable': { + 'projectId': PROJECT, + 'datasetId': DATASET, + 'tableId': SOURCE_TABLE, + }, + 'sourceUris': [SOURCE_URI], + } + }, + } + DATA = { + 'nextPageToken': TOKEN, + 'jobs': [ + ASYNC_QUERY_DATA, + EXTRACT_DATA, + COPY_DATA, + LOAD_DATA, + ] + } + creds = _Credentials() + client = self._makeOne(PROJECT, creds) + conn = client.connection = _Connection(DATA) + + jobs, token = client.list_jobs() + + self.assertEqual(len(jobs), len(DATA['jobs'])) + for found, expected in zip(jobs, DATA['jobs']): + name = expected['jobReference']['jobId'] + self.assertTrue(isinstance(found, JOB_TYPES[name])) + self.assertEqual(found.job_id, expected['id']) + self.assertEqual(token, TOKEN) + + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'GET') + self.assertEqual(req['path'], '/%s' % PATH) + self.assertEqual(req['query_params'], {}) + + def test_list_jobs_explicit_empty(self): + PROJECT = 'PROJECT' + PATH = 'projects/%s/jobs' % PROJECT + DATA = {'jobs': []} + TOKEN = 'TOKEN' + creds = _Credentials() + client = self._makeOne(PROJECT, creds) + conn = client.connection = _Connection(DATA) + + jobs, token = client.list_jobs(max_results=1000, page_token=TOKEN, + all_users=True, state_filter='done') + + self.assertEqual(len(jobs), 0) + self.assertEqual(token, None) + + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'GET') + self.assertEqual(req['path'], '/%s' % PATH) + self.assertEqual(req['query_params'], + {'maxResults': 1000, + 'pageToken': TOKEN, + 'allUsers': True, + 'stateFilter': 'done'}) + def test_load_table_from_storage(self): from gcloud.bigquery.job import LoadTableFromStorageJob PROJECT = 'PROJECT' diff --git a/gcloud/bigquery/test_job.py b/gcloud/bigquery/test_job.py index 63bdea1a02fd..ee1fadb85372 100644 --- a/gcloud/bigquery/test_job.py +++ b/gcloud/bigquery/test_job.py @@ -140,6 +140,13 @@ def _setUpConstants(self): def _makeResource(self, started=False, ended=False): resource = super(TestLoadTableFromStorageJob, self)._makeResource( started, ended) + config = resource['configuration']['load'] + config['sourceUris'] = [self.SOURCE1] + config['destinationTable'] = { + 'projectId': self.PROJECT, + 'datasetId': self.DS_NAME, + 'tableId': self.TABLE_NAME, + } if ended: resource['statistics']['load']['inputFiles'] = self.INPUT_FILES @@ -196,6 +203,13 @@ def _verifyResourceProperties(self, job, resource): self._verifyBooleanConfigProperties(job, config) self._verifyEnumConfigProperties(job, config) + self.assertEqual(job.source_uris, config['sourceUris']) + + table_ref = config['destinationTable'] + self.assertEqual(job.destination.project, table_ref['projectId']) + self.assertEqual(job.destination.dataset_name, table_ref['datasetId']) + self.assertEqual(job.destination.name, table_ref['tableId']) + if 'fieldDelimiter' in config: self.assertEqual(job.field_delimiter, config['fieldDelimiter']) @@ -349,6 +363,61 @@ def test_props_set_by_server(self): self.assertEqual(job.errors, [ERROR_RESULT]) self.assertEqual(job.state, 'STATE') + def test_from_api_repr_missing_identity(self): + self._setUpConstants() + client = _Client(self.PROJECT) + RESOURCE = {} + klass = self._getTargetClass() + with self.assertRaises(KeyError): + klass.from_api_repr(RESOURCE, client=client) + + def test_from_api_repr_missing_config(self): + self._setUpConstants() + client = _Client(self.PROJECT) + RESOURCE = { + 'id': '%s:%s' % (self.PROJECT, self.DS_NAME), + 'jobReference': { + 'projectId': self.PROJECT, + 'jobId': self.JOB_NAME, + } + } + klass = self._getTargetClass() + with self.assertRaises(KeyError): + klass.from_api_repr(RESOURCE, client=client) + + def test_from_api_repr_bare(self): + self._setUpConstants() + client = _Client(self.PROJECT) + RESOURCE = { + 'id': self.JOB_ID, + 'jobReference': { + 'projectId': self.PROJECT, + 'jobId': self.JOB_NAME, + }, + 'configuration': { + 'load': { + 'sourceUris': [self.SOURCE1], + 'destinationTable': { + 'projectId': self.PROJECT, + 'datasetId': self.DS_NAME, + 'tableId': self.TABLE_NAME, + }, + } + }, + } + klass = self._getTargetClass() + job = klass.from_api_repr(RESOURCE, client=client) + self.assertTrue(job._client is client) + self._verifyResourceProperties(job, RESOURCE) + + def test_from_api_repr_w_properties(self): + client = _Client(self.PROJECT) + RESOURCE = self._makeResource() + klass = self._getTargetClass() + dataset = klass.from_api_repr(RESOURCE, client=client) + self.assertTrue(dataset._client is client) + self._verifyResourceProperties(dataset, RESOURCE) + def test_begin_w_bound_client(self): PATH = 'projects/%s/jobs' % self.PROJECT RESOURCE = self._makeResource() @@ -569,11 +638,40 @@ def _getTargetClass(self): from gcloud.bigquery.job import CopyJob return CopyJob + def _makeResource(self, started=False, ended=False): + resource = super(TestCopyJob, self)._makeResource( + started, ended) + config = resource['configuration']['copy'] + config['sourceTables'] = [{ + 'projectId': self.PROJECT, + 'datasetId': self.DS_NAME, + 'tableId': self.SOURCE_TABLE, + }] + config['destinationTable'] = { + 'projectId': self.PROJECT, + 'datasetId': self.DS_NAME, + 'tableId': self.DESTINATION_TABLE, + } + + return resource + def _verifyResourceProperties(self, job, resource): self._verifyReadonlyResourceProperties(job, resource) config = resource.get('configuration', {}).get('copy') + table_ref = config['destinationTable'] + self.assertEqual(job.destination.project, table_ref['projectId']) + self.assertEqual(job.destination.dataset_name, table_ref['datasetId']) + self.assertEqual(job.destination.name, table_ref['tableId']) + + sources = config['sourceTables'] + self.assertEqual(len(sources), len(job.sources)) + for table_ref, table in zip(sources, job.sources): + self.assertEqual(table.project, table_ref['projectId']) + self.assertEqual(table.dataset_name, table_ref['datasetId']) + self.assertEqual(table.name, table_ref['tableId']) + if 'createDisposition' in config: self.assertEqual(job.create_disposition, config['createDisposition']) @@ -604,6 +702,65 @@ def test_ctor(self): self.assertTrue(job.create_disposition is None) self.assertTrue(job.write_disposition is None) + def test_from_api_repr_missing_identity(self): + self._setUpConstants() + client = _Client(self.PROJECT) + RESOURCE = {} + klass = self._getTargetClass() + with self.assertRaises(KeyError): + klass.from_api_repr(RESOURCE, client=client) + + def test_from_api_repr_missing_config(self): + self._setUpConstants() + client = _Client(self.PROJECT) + RESOURCE = { + 'id': '%s:%s' % (self.PROJECT, self.DS_NAME), + 'jobReference': { + 'projectId': self.PROJECT, + 'jobId': self.JOB_NAME, + } + } + klass = self._getTargetClass() + with self.assertRaises(KeyError): + klass.from_api_repr(RESOURCE, client=client) + + def test_from_api_repr_bare(self): + self._setUpConstants() + client = _Client(self.PROJECT) + RESOURCE = { + 'id': self.JOB_ID, + 'jobReference': { + 'projectId': self.PROJECT, + 'jobId': self.JOB_NAME, + }, + 'configuration': { + 'copy': { + 'sourceTables': [{ + 'projectId': self.PROJECT, + 'datasetId': self.DS_NAME, + 'tableId': self.SOURCE_TABLE, + }], + 'destinationTable': { + 'projectId': self.PROJECT, + 'datasetId': self.DS_NAME, + 'tableId': self.DESTINATION_TABLE, + }, + } + }, + } + klass = self._getTargetClass() + job = klass.from_api_repr(RESOURCE, client=client) + self.assertTrue(job._client is client) + self._verifyResourceProperties(job, RESOURCE) + + def test_from_api_repr_w_properties(self): + client = _Client(self.PROJECT) + RESOURCE = self._makeResource() + klass = self._getTargetClass() + dataset = klass.from_api_repr(RESOURCE, client=client) + self.assertTrue(dataset._client is client) + self._verifyResourceProperties(dataset, RESOURCE) + def test_begin_w_bound_client(self): PATH = 'projects/%s/jobs' % self.PROJECT RESOURCE = self._makeResource() @@ -777,11 +934,30 @@ def _getTargetClass(self): from gcloud.bigquery.job import ExtractTableToStorageJob return ExtractTableToStorageJob + def _makeResource(self, started=False, ended=False): + resource = super(TestExtractTableToStorageJob, self)._makeResource( + started, ended) + config = resource['configuration']['extract'] + config['sourceTable'] = { + 'projectId': self.PROJECT, + 'datasetId': self.DS_NAME, + 'tableId': self.SOURCE_TABLE, + } + config['destinationUris'] = [self.DESTINATION_URI] + return resource + def _verifyResourceProperties(self, job, resource): self._verifyReadonlyResourceProperties(job, resource) config = resource.get('configuration', {}).get('extract') + self.assertEqual(job.destination_uris, config['destinationUris']) + + table_ref = config['sourceTable'] + self.assertEqual(job.source.project, table_ref['projectId']) + self.assertEqual(job.source.dataset_name, table_ref['datasetId']) + self.assertEqual(job.source.name, table_ref['tableId']) + if 'compression' in config: self.assertEqual(job.compression, config['compression']) @@ -826,6 +1002,61 @@ def test_ctor(self): self.assertTrue(job.field_delimiter is None) self.assertTrue(job.print_header is None) + def test_from_api_repr_missing_identity(self): + self._setUpConstants() + client = _Client(self.PROJECT) + RESOURCE = {} + klass = self._getTargetClass() + with self.assertRaises(KeyError): + klass.from_api_repr(RESOURCE, client=client) + + def test_from_api_repr_missing_config(self): + self._setUpConstants() + client = _Client(self.PROJECT) + RESOURCE = { + 'id': '%s:%s' % (self.PROJECT, self.DS_NAME), + 'jobReference': { + 'projectId': self.PROJECT, + 'jobId': self.JOB_NAME, + } + } + klass = self._getTargetClass() + with self.assertRaises(KeyError): + klass.from_api_repr(RESOURCE, client=client) + + def test_from_api_repr_bare(self): + self._setUpConstants() + client = _Client(self.PROJECT) + RESOURCE = { + 'id': self.JOB_ID, + 'jobReference': { + 'projectId': self.PROJECT, + 'jobId': self.JOB_NAME, + }, + 'configuration': { + 'extract': { + 'sourceTable': { + 'projectId': self.PROJECT, + 'datasetId': self.DS_NAME, + 'tableId': self.SOURCE_TABLE, + }, + 'destinationUris': [self.DESTINATION_URI], + } + }, + } + klass = self._getTargetClass() + job = klass.from_api_repr(RESOURCE, client=client) + self.assertTrue(job._client is client) + self._verifyResourceProperties(job, RESOURCE) + + def test_from_api_repr_w_properties(self): + client = _Client(self.PROJECT) + RESOURCE = self._makeResource() + klass = self._getTargetClass() + dataset = klass.from_api_repr(RESOURCE, client=client) + self.assertTrue(dataset._client is client) + self._verifyResourceProperties(dataset, RESOURCE) + def test_begin_w_bound_client(self): PATH = 'projects/%s/jobs' % self.PROJECT RESOURCE = self._makeResource() @@ -989,11 +1220,19 @@ def test_reload_w_alternate_client(self): class TestQueryJob(unittest2.TestCase, _Base): JOB_TYPE = 'query' QUERY = 'select count(*) from persons' + DESTINATION_TABLE = 'destination_table' def _getTargetClass(self): from gcloud.bigquery.job import QueryJob return QueryJob + def _makeResource(self, started=False, ended=False): + resource = super(TestQueryJob, self)._makeResource( + started, ended) + config = resource['configuration']['query'] + config['query'] = self.QUERY + return resource + def _verifyBooleanResourceProperties(self, job, config): if 'allowLargeResults' in config: @@ -1033,7 +1272,7 @@ def _verifyResourceProperties(self, job, resource): else: self.assertTrue(job.default_dataset is None) if 'destinationTable' in config: - table = job.destination_table + table = job.destination tb_ref = { 'projectId': table.project, 'datasetId': table.dataset_name, @@ -1041,7 +1280,7 @@ def _verifyResourceProperties(self, job, resource): } self.assertEqual(tb_ref, config['destinationTable']) else: - self.assertTrue(job.destination_table is None) + self.assertTrue(job.destination is None) if 'priority' in config: self.assertEqual(job.priority, config['priority']) @@ -1068,12 +1307,65 @@ def test_ctor(self): self.assertTrue(job.allow_large_results is None) self.assertTrue(job.create_disposition is None) self.assertTrue(job.default_dataset is None) - self.assertTrue(job.destination_table is None) + self.assertTrue(job.destination is None) self.assertTrue(job.flatten_results is None) self.assertTrue(job.priority is None) self.assertTrue(job.use_query_cache is None) self.assertTrue(job.write_disposition is None) + def test_from_api_repr_missing_identity(self): + self._setUpConstants() + client = _Client(self.PROJECT) + RESOURCE = {} + klass = self._getTargetClass() + with self.assertRaises(KeyError): + klass.from_api_repr(RESOURCE, client=client) + + def test_from_api_repr_missing_config(self): + self._setUpConstants() + client = _Client(self.PROJECT) + RESOURCE = { + 'id': '%s:%s' % (self.PROJECT, self.DS_NAME), + 'jobReference': { + 'projectId': self.PROJECT, + 'jobId': self.JOB_NAME, + } + } + klass = self._getTargetClass() + with self.assertRaises(KeyError): + klass.from_api_repr(RESOURCE, client=client) + + def test_from_api_repr_bare(self): + self._setUpConstants() + client = _Client(self.PROJECT) + RESOURCE = { + 'id': self.JOB_ID, + 'jobReference': { + 'projectId': self.PROJECT, + 'jobId': self.JOB_NAME, + }, + 'configuration': { + 'query': {'query': self.QUERY} + }, + } + klass = self._getTargetClass() + job = klass.from_api_repr(RESOURCE, client=client) + self.assertTrue(job._client is client) + self._verifyResourceProperties(job, RESOURCE) + + def test_from_api_repr_w_properties(self): + client = _Client(self.PROJECT) + RESOURCE = self._makeResource() + RESOURCE['configuration']['query']['destinationTable'] = { + 'projectId': self.PROJECT, + 'datasetId': self.DS_NAME, + 'tableId': self.DESTINATION_TABLE, + } + klass = self._getTargetClass() + dataset = klass.from_api_repr(RESOURCE, client=client) + self.assertTrue(dataset._client is client) + self._verifyResourceProperties(dataset, RESOURCE) + def test_begin_w_bound_client(self): PATH = 'projects/%s/jobs' % self.PROJECT RESOURCE = self._makeResource() @@ -1144,7 +1436,7 @@ def test_begin_w_alternate_client(self): job.allow_large_results = True job.create_disposition = 'CREATE_NEVER' job.default_dataset = dataset - job.destination_table = table + job.destination = table job.flatten_results = True job.priority = 'INTERACTIVE' job.use_query_cache = True @@ -1213,11 +1505,11 @@ def test_reload_w_bound_client(self): dataset = Dataset(DS_NAME, client) table = Table(DEST_TABLE, dataset) - job.destination_table = table + job.destination = table job.reload() - self.assertEqual(job.destination_table, None) + self.assertEqual(job.destination, None) self.assertEqual(len(conn._requested), 1) req = conn._requested[0]