Skip to main content

BigQuery

User can use BigQuery persistence to write/persist spark dataframe to google cloud BigQuery table.

There are two ways to write the dataframe to BigQuery table:

  • Direct Write
  • Indirect Write

You can read about the difference between these two approaches here.

Direct Write

User can configure the BigQuery persistence using direct write approach in the below manner:

persistence = {
type = "BigQuery"
service-account-credentials-file = "/Users/xyz/Downloads/creds-file.json"
table = "project-name:dataset-name.table-name"
writer-type = {
type = "DirectBigQueryWriterType"
http-connect-timeout = 5000
}
}

Apart from http-connect-timeout, user can configure below other parameters in the writer-type:

Parameter NameDefault ValueDescription
big-query-table-labelEmpty ListCan be used to add labels to the table while writing to a table. Multiple labels can be set.
clustered-fieldsNoneA string of non-repeated, top-level columns separated by a comma.
create-dispositionCREATE_IF_NEEDEDSpecifies whether the job is allowed to create new tables. The permitted values are:
CREATE_IF_NEEDED - Configures the job to create the table if it does not exist
CREATE_NEVER - Configures the job to fail if the table does not exist
This option takes place only in case Spark has decided to write data to the table based on the SaveMode.
datetime-zone-idUTCThe time zone ID used to convert BigQuery's DATETIME into Spark's Timestamp, and vice versa.
The value should be a legal time zone name, that appears is accepted by Java's java.time.ZoneId.
destination-table-kms-key-nameNoneDescribes the Cloud KMS encryption key that will be used to protect the destination BigQuery table. The BigQuery Service Account associated with your project requires access to this encryption key. For further Information about using CMEK with BigQuery see here.
The table will be encrypted by the key only if it created by the connector. A pre-existing unencrypted table won't be encrypted just by setting this option.
enable-list-inferencefalseIndicates whether to use schema inference specifically when the mode is Parquet.
enable-mode-check-for-schema-fieldstrueChecks the mode of every field in the destination schema to be equal to the mode in corresponding source field schema, during DIRECT write.
http-connect-timeout6000The timeout in milliseconds to establish a connection with BigQuery. Can be alternatively set in the Spark configuration (spark.conf.set("httpConnectTimeout", ...)) or in Hadoop Configuration (fs.gs.http.connect-timeout).
http-max-retry10The maximum number of retries for the low-level HTTP requests to BigQuery. Can be alternatively set in the Spark configuration (spark.conf.set("httpMaxRetry", ...)) or in Hadoop Configuration (fs.gs.http.max.retry).
proxy-addressNoneAddress of the proxy server. The proxy must be an HTTP proxy, and the address should be in the host:port format. Can be alternatively set in the Spark configuration (spark.conf.set(...)) or in Hadoop Configuration (fs.gs.proxy.address).
(Optional. Required only if connecting to GCP via proxy.)
proxy-usernameNoneThe userName used to connect to the proxy. Can be alternatively set in the Spark configuration (spark.conf.set(...)) or in Hadoop Configuration (fs.gs.proxy.username).
proxy-passwordNoneThe password used to connect to the proxy. Can be alternatively set in the Spark configuration (spark.conf.set(...)) or in Hadoop Configuration (fs.gs.proxy.password).
query-job-priorityINTERACTIVEPriority levels set for the job while reading data from BigQuery query. The permitted values are:
BATCH - Query is queued and started as soon as idle resources are available, usually within a few minutes. If the query hasn't started within 3 hours, its priority is changed to INTERACTIVE.
INTERACTIVE - Query is executed as soon as possible and counts towards the concurrent rate limit and the daily rate limit.
For WRITE, this option will be effective when DIRECT write is used with OVERWRITE mode, where the connector overwrites the destination table using MERGE statement.
write-at-least-oncefalseGuarantees that data is written to BigQuery at least once. This is a lesser guarantee than exactly once. This is suitable for streaming scenarios in which data is continuously being written in small batches.
Supported only by the DIRECT write method and mode is NOT Overwrite.

Indirect Write

User can configure the BigQuery persistence using indirect write approach in the below manner:

persistence = {
type = "BigQuery"
service-account-credentials-file = "/Users/xyz/Downloads/creds-file.json"
table = "project-name:dataset-name.table-name"
writer-type = {
type = "IndirectBigQueryWriterType"
temporary-gcs-bucket = "temp-bucket"
}
}

Apart from temporary-gcs-bucket, user can configure below other parameters in the writer-type:

Parameter NameDefault ValueDescription
allow-field-additionfalseAdds the ALLOW_FIELD_ADDITION SchemaUpdateOption to the BigQuery LoadJob. Allowed values are true and false.
allow-field-relaxationfalseAdds the ALLOW_FIELD_RELAXATION SchemaUpdateOption to the BigQuery LoadJob. Allowed values are true and false.
big-query-table-labelEmpty ListCan be used to add labels to the table while writing to a table. Multiple labels can be set.
clustered-fieldsNoneA string of non-repeated, top-level columns separated by a comma.
create-dispositionCREATE_IF_NEEDEDSpecifies whether the job is allowed to create new tables. The permitted values are:
CREATE_IF_NEEDED - Configures the job to create the table if it does not exist
CREATE_NEVER - Configures the job to fail if the table does not exist
This option takes place only in case Spark has decided to write data to the table based on the SaveMode.
datetime-zone-idUTCThe time zone ID used to convert BigQuery's DATETIME into Spark's Timestamp, and vice versa.
The value should be a legal time zone name, that appears is accepted by Java's java.time.ZoneId.
date-partitionNoneThe date partition the data is going to be written to. Should be a date string given in the format YYYYMMDD. Can be used to overwrite the data of a single partition.
Can also be used with different partition types like:
HOUR: YYYYMMDDHH
MONTH: YYYYMM
YEAR: YYYY
destination-table-kms-key-nameNoneDescribes the Cloud KMS encryption key that will be used to protect the destination BigQuery table. The BigQuery Service Account associated with your project requires access to this encryption key. for further Information about using CMEK with BigQuery see here.
The table will be encrypted by the key only if it created by the connector. A pre-existing unencrypted table won't be encrypted just by setting this option.
enable-list-inferencefalseIndicates whether to use schema inference specifically when the mode is Parquet.
http-connect-timeout6000The timeout in milliseconds to establish a connection with BigQuery. Can be alternatively set in the Spark configuration (spark.conf.set("httpConnectTimeout", ...)) or in Hadoop Configuration (fs.gs.http.connect-timeout).
http-max-retry10The maximum number of retries for the low-level HTTP requests to BigQuery. Can be alternatively set in the Spark configuration (spark.conf.set("httpMaxRetry", ...)) or in Hadoop Configuration (fs.gs.http.max.retry).
intermediate-formatparquetThe format of the data before it is loaded to BigQuery, values can be either "parquet","orc" or "avro". In order to use the Avro format, the spark-avro package must be added in runtime.
partition-expiration-msNoneNumber of milliseconds for which to keep the storage for partitions in the table. The storage in a partition will have an expiration time of its partition time plus this value.
partition-fieldNoneIf field is specified together with partition-type, the table is partitioned by this field. The field must be a top-level TIMESTAMP or DATE field. Its mode must be NULLABLE or REQUIRED. If the option is not set for a partitioned table, then the table will be partitioned by pseudo column, referenced via either '_PARTITIONTIME' as TIMESTAMP type, or '_PARTITIONDATE' as DATE type.
partition-typeNoneSupported types are: HOUR, DAY, MONTH, YEAR. This option is mandatory for a target table to be partitioned. (Optional. Defaults to DAY if PartitionField is specified).
persistent-gcs-bucketNoneThe GCS bucket that holds the data before it is loaded to BigQuery. If informed, the data won't be deleted after write data into BigQuery.
persistent-gcs-pathNoneThe GCS path that holds the data before it is loaded to BigQuery. Used only with persistent-gcs-bucket.
proxy-addressNoneAddress of the proxy server. The proxy must be an HTTP proxy and address should be in the host:port format. Can be alternatively set in the Spark configuration (spark.conf.set(...)) or in Hadoop Configuration (fs.gs.proxy.address).
(Optional. Required only if connecting to GCP via proxy.)
proxy-usernameNoneThe userName used to connect to the proxy. Can be alternatively set in the Spark configuration (spark.conf.set(...)) or in Hadoop Configuration (fs.gs.proxy.username).
proxy-passwordNoneThe password used to connect to the proxy. Can be alternatively set in the Spark configuration (spark.conf.set(...)) or in Hadoop Configuration (fs.gs.proxy.password).
temporary-gcs-bucketNoneThe GCS bucket that temporarily holds the data before it is loaded to BigQuery. Required unless set in the Spark configuration (spark.conf.set(...)).
use-avro-logical-typesfalseWhen loading from Avro (.option("intermediateFormat", "avro")), BigQuery uses the underlying Avro types instead of the logical types [by default]. Supplying this option converts Avro logical types to their corresponding BigQuery data types.

Irrespective of the direct or indirect write approach, the BigQuery persistence needs below arguments from the user:

Parameter NameMandatoryDefault ValueDescription
tableYesNoneThe name of big query table in the format [[project:]dataset.]table where dataframe needs to be persisted.
service-account-credentials-fileNoNoneThe filepath of the GCP service account credentials.
datasetNoNoneThe dataset containing the table. If you are providing fully qualified name in table parameter, then you can ignore this option.
projectNoNoneThe Google Cloud Project ID of the table.
(Optional. Defaults to the project of the Service Account being used)
parent-projectNoNoneThe Google Cloud Project ID of the table to bill for the export.
(Optional. Defaults to the project of the Service Account being used).
save-modeNoErrorIfExistsThis is used to specify the expected behavior of saving a DataFrame to a data source.
Expected values are (append, overwrite, errorifexists, ignore)
writer-typeYesNoneThe instance of direct or indirect big query writer type.

Also, note that for writing to the BigQuery it is necessary to have below privileges to the user:

Role NamePurpose
roles/bigquery.dataEditorAccess BigQuery Tables
roles/bigquery.jobUserCreate and query BigQuery tables
roles/storage.objectViewerTo list and read GCS contents
roles/storage.objectCreatorTo create folders in GCS