in Education by
I am trying to run a BigQueryOperator on GCC. I have already succeeded in running for BigQueryCreateEmptyTableOperator and BigQueryTableDeleteOperator. Here is my code for the dag: import datetime import os import logging from airflow import configuration from airflow import models from airflow import DAG from airflow.operators import email_operator from airflow.contrib.operators import bigquery_operator from airflow.contrib.operators import bigquery_check_operator from airflow.utils import trigger_rule from contextlib import suppress import json from airflow.operators import python_operator yesterday = datetime.datetime.combine( datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()) default_dag_args = { # Setting start date as yesterday starts the DAG immediately when it is # detected in the Cloud Storage bucket. 'start_date': yesterday, 'email_on_failure': True, 'email_on_retry': True, 'project_id' : 'censored', 'retries': 1, 'retry_delay': datetime.timedelta(minutes=5), } bq_dataset_name= 'test_tf_blocket' bq_githib_table_id = bq_dataset_name + '.trialtable' # [START composer_quickstart_schedule] with models.DAG( dag_id='composer_nicholas', # Continue to run DAG once per day schedule_interval=datetime.timedelta(days=1), default_args=default_dag_args) as dag: # [END composer_quickstart_schedule] def greeting(): logging.info('Hello World!') hello_python = python_operator.PythonOperator( task_id='hello', python_callable=greeting) bq_union_query = bigquery_operator.BigQueryOperator( task_id='bq_union_query', bql=""" select * from test_tf_blocket.nicholas_union_query; """, query_params={}) email_start = email_operator.EmailOperator( task_id='email_it', to='[email protected]', subject='Sample temail', html_content=""" Done. """) hello_python >> bq_union_query >> email_start The dag fails when it hits the bigqueryOperator with the error(log) : *** Reading remote log from gs://asia-south1-staging-b017f2bf-bucket/logs/composer_nicholas/bq_union_query/2019-03-21T14:56:45.453098+00:00/30.log. [2019-03-22 13:12:54,129] {models.py:1361} INFO - Dependencies all met for [2019-03-22 13:12:54,167] {models.py:1361} INFO - Dependencies all met for [2019-03-22 13:12:54,168] {models.py:1573} INFO - ------------------------------------------------------------------------------- Starting attempt 30 of 3 ------------------------------------------------------------------------------- [2019-03-22 13:12:54,199] {models.py:1595} INFO - Executing on 2019-03-21T14:56:45.453098+00:00 [2019-03-22 13:12:54,200] {base_task_runner.py:118} INFO - Running: ['bash', '-c', 'airflow run composer_nicholas bq_union_query 2019-03-21T14:56:45.453098+00:00 --job_id 571 --raw -sd DAGS_FOLDER/nicholas_union_query.py --cfg_path /tmp/tmpn1ic1w_6'] [2019-03-22 13:13:06,400] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:06,400] {settings.py:176} INFO - setting.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800 [2019-03-22 13:13:08,433] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:08,431] {default_celery.py:80} WARNING - You have configured a result_backend of redis://airflow-redis-service:6379/0, it is highly recommended to use an alternative result_backend (i.e. a database). [2019-03-22 13:13:08,435] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:08,435] {__init__.py:51} INFO - Using executor CeleryExecutor [2019-03-22 13:13:09,182] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:09,181] {app.py:51} WARNING - Using default Composer Environment Variables. Overrides have not been applied. [2019-03-22 13:13:09,198] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:09,198] {configuration.py:516} INFO - Reading the config from /etc/airflow/airflow.cfg [2019-03-22 13:13:09,210] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:09,210] {configuration.py:516} INFO - Reading the config from /etc/airflow/airflow.cfg [2019-03-22 13:13:09,873] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:09,873] {models.py:271} INFO - Filling up the DagBag from /home/airflow/gcs/dags/nicholas_union_query.py [2019-03-22 13:13:12,207] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query /usr/local/lib/airflow/airflow/models.py:2412: PendingDeprecationWarning: Invalid arguments were passed to BigQueryOperator. Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were: [2019-03-22 13:13:12,208] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query *args: () [2019-03-22 13:13:12,208] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query **kwargs: {'api_resource_config': {'useQueryCache': True, 'jobType': 'QUERY'}} [2019-03-22 13:13:12,208] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query category=PendingDeprecationWarning [2019-03-22 13:13:12,209] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query /usr/local/lib/airflow/airflow/contrib/operators/bigquery_operator.py:151: DeprecationWarning: Deprecated parameter `bql` used in Task id: bq_union_query. Use `sql` parameter instead to pass the sql to be executed. `bql` parameter is deprecated and will be removed in a future version of Airflow. [2019-03-22 13:13:12,210] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query category=DeprecationWarning) [2019-03-22 13:13:16,838] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:16,838] {cli.py:484} INFO - Running on host airflow-worker-7c9b9c7f86-xwhg5 [2019-03-22 13:13:17,455] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:17,453] {bigquery_operator.py:159} INFO - Executing: [2019-03-22 13:13:17,457] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query select * from test_tf_blocket.nicholas_union_query; [2019-03-22 13:13:17,457] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:17,632] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:17,632] {gcp_api_base_hook.py:92} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook. [2019-03-22 13:13:17,657] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:17,656] {discovery.py:272} INFO - URL being requested: GET https://www.googleapis.com/discovery/v1/apis/bigquery/v2/rest [2019-03-22 13:13:18,336] {logging_mixin.py:95} WARNING - /usr/local/lib/airflow/airflow/contrib/hooks/bigquery_hook.py:559: DeprecationWarning: Deprecated parameter `bql` used in `BigQueryBaseCursor.run_query` Use `sql` parameter instead to pass the sql to be executed. `bql` parameter is deprecated and will be removed in a future version of Airflow category=DeprecationWarning) [2019-03-22 13:13:18,338] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:18,336] {logging_mixin.py:95} WARNING - /usr/local/lib/airflow/airflow/contrib/hooks/bigquery_hook.py:559: DeprecationWarning: Deprecated parameter `bql` used in `BigQueryBaseCursor.run_query` Use `sql` parameter instead to pass the sql to be executed. `bql` parameter is deprecated and will be removed in a future version of Airflow. [2019-03-22 13:13:18,338] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query category=DeprecationWarning) [2019-03-22 13:13:18,360] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:18,359] {discovery.py:873} INFO - URL being requested: POST https://www.googleapis.com/bigquery/v2/projects/censored-analytics-censored/jobs?alt=json [2019-03-22 13:13:18,885] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:18,884] {discovery.py:873} INFO - URL being requested: GET https://www.googleapis.com/bigquery/v2/projects/censored-analytics-censored/jobs/job_Nh77hL-CG3GHYCfWZY2Fhz4PgLlS?alt=json [2019-03-22 13:13:20,341] {models.py:1760} ERROR - ('BigQuery job status check failed. Final error was: %s', 404) Traceback (most recent call last) File "/usr/local/lib/airflow/airflow/contrib/hooks/bigquery_hook.py", line 1014, in run_with_configuratio jobId=self.running_job_id).execute( File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/_helpers.py", line 130, in positional_wrappe return wrapped(*args, **kwargs File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/http.py", line 851, in execut raise HttpError(resp, content, uri=self.uri googleapiclient.errors.HttpError: [2019-03-22 13:13:20,348] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:20,349] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query During handling of the above exception, another exception occurred: [2019-03-22 13:13:20,349] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:20,349] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query Traceback (most recent call last): [2019-03-22 13:13:20,349] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query File "/usr/local/lib/airflow/airflow/models.py", line 1659, in _run_raw_task [2019-03-22 13:13:20,350] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query result = task_copy.execute(context=context) [2019-03-22 13:13:20,350] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query File "/usr/local/lib/airflow/airflow/contrib/operators/bigquery_operator.py", line 180, in execute [2019-03-22 13:13:20,350] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query time_partitioning=self.time_partitioning [2019-03-22 13:13:20,350] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query File "/usr/local/lib/airflow/airflow/contrib/hooks/bigquery_hook.py", line 651, in run_query [2019-03-22 13:13:20,350] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query return self.run_with_configuration(configuration) [2019-03-22 13:13:20,350] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query File "/usr/local/lib/airflow/airflow/contrib/hooks/bigquery_hook.py", line 1036, in run_with_configuration [2019-03-22 13:13:20,350] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query err.resp.status) [2019-03-22 13:13:20,351] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query Exception: ('BigQuery job status check failed. Final error was: %s', 404) [2019-03-22 13:13:20,352] {models.py:1783} INFO - Marking task as UP_FOR_RETRY [2019-03-22 13:13:20,352] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:20,352] {models.py:1783} INFO - Marking task as UP_FOR_RETRY [2019-03-22 13:13:20,400] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query Traceback (most recent call last): [2019-03-22 13:13:20,400] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query File "/usr/local/lib/airflow/airflow/contrib/hooks/bigquery_hook.py", line 1014, in run_with_configuration [2019-03-22 13:13:20,403] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query jobId=self.running_job_id).execute() [2019-03-22 13:13:20,405] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/_helpers.py", line 130, in positional_wrapper [2019-03-22 13:13:20,406] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query return wrapped(*args, **kwargs) [2019-03-22 13:13:20,407] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/http.py", line 851, in execute [2019-03-22 13:13:20,408] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query raise HttpError(resp, content, uri=self.uri) [2019-03-22 13:13:20,409] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query googleapiclient.errors.HttpError: [2019-03-22 13:13:20,409] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:20,410] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query During handling of the above exception, another exception occurred: [2019-03-22 13:13:20,411] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:20,411] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query Traceback (most recent call last): [2019-03-22 13:13:20,411] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query File "/usr/local/bin/airflow", line 7, in [2019-03-22 13:13:20,412] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query exec(compile(f.read(), __file__, 'exec')) [2019-03-22 13:13:20,412] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query File "/usr/local/lib/airflow/airflow/bin/airflow", line 32, in [2019-03-22 13:13:20,413] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query args.func(args) [2019-03-22 13:13:20,414] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query File "/usr/local/lib/airflow/airflow/utils/cli.py", line 74, in wrapper [2019-03-22 13:13:20,414] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query return f(*args, **kwargs) [2019-03-22 13:13:20,415] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query File "/usr/local/lib/airflow/airflow/bin/cli.py", line 490, in run [2019-03-22 13:13:20,416] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query _run(args, dag, ti) [2019-03-22 13:13:20,416] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query File "/usr/local/lib/airflow/airflow/bin/cli.py", line 406, in _run [2019-03-22 13:13:20,417] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query pool=args.pool, [2019-03-22 13:13:20,418] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query File "/usr/local/lib/airflow/airflow/utils/db.py", line 74, in wrapper [2019-03-22 13:13:20,420] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query return func(*args, **kwargs) [2019-03-22 13:13:20,421] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query File "/usr/local/lib/airflow/airflow/models.py", line 1659, in _run_raw_task [2019-03-22 13:13:20,421] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query result = task_copy.execute(context=context) [2019-03-22 13:13:20,421] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query File "/usr/local/lib/airflow/airflow/contrib/operators/bigquery_operator.py", line 180, in execute [2019-03-22 13:13:20,422] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query time_partitioning=self.time_partitioning [2019-03-22 13:13:20,422] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query File "/usr/local/lib/airflow/airflow/contrib/hooks/bigquery_hook.py", line 651, in run_query [2019-03-22 13:13:20,425] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query return self.run_with_configuration(configuration) [2019-03-22 13:13:20,425] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query File "/usr/local/lib/airflow/airflow/contrib/hooks/bigquery_hook.py", line 1036, in run_with_configuration [2019-03-22 13:13:20,427] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query err.resp.status) [2019-03-22 13:13:20,427] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query Exception: ('BigQuery job status check failed. Final error was: %s', 404) If I had typed a different sql query, for eg ( delete table ), the query would work. I am doing a select query here for simplicity. Point is, the sql query in here works, but the dag fails. It would seem that the dag has failed to retrieve the query history/job history from BQ. I checked if the json file exist, and yes it did. Heres a screen shot BQ SS Initially I thought this was a permission issue, but i checked and the cloud composer generated service account has project owner rights and BQ admin rights. I've tried searching around but cannot seem to find an answer. Any help is appreciated. JavaScript questions and answers, JavaScript questions pdf, JavaScript question bank, JavaScript questions and answers pdf, mcq on JavaScript pdf, JavaScript questions and solutions, JavaScript mcq Test , Interview JavaScript questions, JavaScript Questions for Interview, JavaScript MCQ (Multiple Choice Questions)

1 Answer

0 votes
by
Since your BigQuery dataset resides in asia-southeast1, BigQuery created a job in the same location by default, which is asia-southeast1. However, the Airflow in your Composer environment was trying to get the job's status without specifying location field. Reference: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get This has been fixed by my PR and it has been merged to master, and it will be released in v2.0.0. However, Composer's latest Airflow version is v1.10.2, so you need a work around to make it work. To work around this, you can extend the BigQueryCursor and override the run_with_configuration() function with location support. Please refer to: https://github.com/apache/airflow/pull/4695/files#diff-ee06f8fcbc476ea65446a30160c2a2b2R1213 and check how to patch it.

Related questions

0 votes
    This feels like a noobish question to which I should be able to find the answer on the web, but ... JavaScript Questions for Interview, JavaScript MCQ (Multiple Choice Questions)...
asked Apr 26, 2022 in Education by JackTerrance
0 votes
    Good day! I'm using cloud dlp api to inspect bigquery views by converting chunks of the data into ... Questions for Interview, JavaScript MCQ (Multiple Choice Questions)...
asked Apr 2, 2022 in Education by JackTerrance
0 votes
    Which of the following options are the key features of Google BigQuery? (1)Serverless (2)Scalable (3)Real-time data ... analysis (4)Logical Dataware house (5)All of the options...
asked Jun 1, 2021 in Technology by JackTerrance
0 votes
    Borg is used in Google BigQuery architecture for _________. (1)Job orchestration (2)Processing data (3)Networking (4)Data Storage...
asked Jun 1, 2021 in Technology by JackTerrance
0 votes
    Columnar capacity is utilized in Google BigQuery to accomplish _____. (1)Composite Ratio (2)Dedupe Ratio (3)Compression Ratio (4)None of the Optionso...
asked May 31, 2021 by JackTerrance
0 votes
    Which of the following options are the primitive field types in Google BigQuery schema? (1)All of the options (2)FLOAT (3)INTEGER (4)TIMESTAMP (5)STRING (6)BOOLEAN...
asked May 31, 2021 in Education by JackTerrance
0 votes
    We're running multiple streaming Dataflow pipelines that always eventually hang and need to be restarted after about ... for Interview, JavaScript MCQ (Multiple Choice Questions)...
asked May 13, 2022 in Education by JackTerrance
0 votes
    I am new to python and cloud vision.I need to label ~20k images.My code although does the work, ... JavaScript Questions for Interview, JavaScript MCQ (Multiple Choice Questions)...
asked May 7, 2022 in Education by JackTerrance
0 votes
    When setup a IP-Alias via gloud command or the interface, it works out of the box. But in the ... JavaScript Questions for Interview, JavaScript MCQ (Multiple Choice Questions)...
asked Apr 26, 2022 in Education by JackTerrance
0 votes
    I tried to find out how to use firestore local emulator for python and for testing purpose. But I ... Questions for Interview, JavaScript MCQ (Multiple Choice Questions)...
asked Apr 26, 2022 in Education by JackTerrance
0 votes
    When I attempt to load data into BigQuery from Google Cloud Storage it asks for the Google Cloud Storage URI ... Developers Console? Select the correct answer from above options...
asked Feb 2, 2022 in Education by JackTerrance
0 votes
    What is the actual difference between Google Compute Engine, App Engine and Container Engine in Google Cloud Compute ... three engines? Select the correct answer from above options...
asked Feb 2, 2022 in Education by JackTerrance
0 votes
    It seems that from Google Cloud Console there is no way to delete a project. Select the correct answer from above options...
asked Feb 2, 2022 in Education by JackTerrance
0 votes
    I would like to transfer ownership of the Google Cloud project completely. By completely I mean rather than ... Thanks in advance Select the correct answer from above options...
asked Feb 2, 2022 in Education by JackTerrance
0 votes
    Is it possible to rename a Google Cloud Platform project? If so, how? I don't need to change the project ID ... . Thanks for any tips! Select the correct answer from above options...
asked Feb 1, 2022 in Education by JackTerrance
...