BigQuery
User can use BigQuery persistence to write/persist spark dataframe to google cloud BigQuery table.
There are two ways to write the dataframe to BigQuery table:
- Direct Write
- Indirect Write
You can read about the difference between these two approaches here.
Direct Write
User can configure the BigQuery persistence using direct write approach in the below manner:
persistence = {
type = "BigQuery"
service-account-credentials-file = "/Users/xyz/Downloads/creds-file.json"
table = "project-name:dataset-name.table-name"
writer-type = {
type = "DirectBigQueryWriterType"
http-connect-timeout = 5000
}
}
Apart from http-connect-timeout
, user can configure below other parameters in the writer-type:
Parameter Name | Default Value | Description |
---|---|---|
big-query-table-label | Empty List | Can be used to add labels to the table while writing to a table. Multiple labels can be set. |
clustered-fields | None | A string of non-repeated, top-level columns separated by a comma. |
create-disposition | CREATE_IF_NEEDED | Specifies whether the job is allowed to create new tables. The permitted values are: CREATE_IF_NEEDED - Configures the job to create the table if it does not exist CREATE_NEVER - Configures the job to fail if the table does not exist This option takes place only in case Spark has decided to write data to the table based on the SaveMode. |
datetime-zone-id | UTC | The time zone ID used to convert BigQuery's DATETIME into Spark's Timestamp, and vice versa. The value should be a legal time zone name, that appears is accepted by Java's java.time.ZoneId. |
destination-table-kms-key-name | None | Describes the Cloud KMS encryption key that will be used to protect the destination BigQuery table. The BigQuery Service Account associated with your project requires access to this encryption key. For further Information about using CMEK with BigQuery see here. The table will be encrypted by the key only if it created by the connector. A pre-existing unencrypted table won't be encrypted just by setting this option. |
enable-list-inference | false | Indicates whether to use schema inference specifically when the mode is Parquet. |
enable-mode-check-for-schema-fields | true | Checks the mode of every field in the destination schema to be equal to the mode in corresponding source field schema, during DIRECT write. |
http-connect-timeout | 6000 | The timeout in milliseconds to establish a connection with BigQuery. Can be alternatively set in the Spark configuration (spark.conf.set("httpConnectTimeout", ...)) or in Hadoop Configuration (fs.gs.http.connect-timeout). |
http-max-retry | 10 | The maximum number of retries for the low-level HTTP requests to BigQuery. Can be alternatively set in the Spark configuration (spark.conf.set("httpMaxRetry", ...)) or in Hadoop Configuration (fs.gs.http.max.retry). |
proxy-address | None | Address of the proxy server. The proxy must be an HTTP proxy, and the address should be in the host:port format. Can be alternatively set in the Spark configuration (spark.conf.set(...)) or in Hadoop Configuration (fs.gs.proxy.address).(Optional. Required only if connecting to GCP via proxy.) |
proxy-username | None | The userName used to connect to the proxy. Can be alternatively set in the Spark configuration (spark.conf.set(...)) or in Hadoop Configuration (fs.gs.proxy.username). |
proxy-password | None | The password used to connect to the proxy. Can be alternatively set in the Spark configuration (spark.conf.set(...)) or in Hadoop Configuration (fs.gs.proxy.password). |
query-job-priority | INTERACTIVE | Priority levels set for the job while reading data from BigQuery query. The permitted values are: BATCH - Query is queued and started as soon as idle resources are available, usually within a few minutes. If the query hasn't started within 3 hours, its priority is changed to INTERACTIVE. INTERACTIVE - Query is executed as soon as possible and counts towards the concurrent rate limit and the daily rate limit. For WRITE, this option will be effective when DIRECT write is used with OVERWRITE mode, where the connector overwrites the destination table using MERGE statement. |
write-at-least-once | false | Guarantees that data is written to BigQuery at least once. This is a lesser guarantee than exactly once. This is suitable for streaming scenarios in which data is continuously being written in small batches. Supported only by the DIRECT write method and mode is NOT Overwrite . |
Indirect Write
User can configure the BigQuery persistence using indirect write approach in the below manner:
persistence = {
type = "BigQuery"
service-account-credentials-file = "/Users/xyz/Downloads/creds-file.json"
table = "project-name:dataset-name.table-name"
writer-type = {
type = "IndirectBigQueryWriterType"
temporary-gcs-bucket = "temp-bucket"
}
}
Apart from temporary-gcs-bucket
, user can configure below other parameters in the writer-type:
Parameter Name | Default Value | Description |
---|---|---|
allow-field-addition | false | Adds the ALLOW_FIELD_ADDITION SchemaUpdateOption to the BigQuery LoadJob. Allowed values are true and false. |
allow-field-relaxation | false | Adds the ALLOW_FIELD_RELAXATION SchemaUpdateOption to the BigQuery LoadJob. Allowed values are true and false. |
big-query-table-label | Empty List | Can be used to add labels to the table while writing to a table. Multiple labels can be set. |
clustered-fields | None | A string of non-repeated, top-level columns separated by a comma. |
create-disposition | CREATE_IF_NEEDED | Specifies whether the job is allowed to create new tables. The permitted values are: CREATE_IF_NEEDED - Configures the job to create the table if it does not exist CREATE_NEVER - Configures the job to fail if the table does not exist This option takes place only in case Spark has decided to write data to the table based on the SaveMode. |
datetime-zone-id | UTC | The time zone ID used to convert BigQuery's DATETIME into Spark's Timestamp, and vice versa. The value should be a legal time zone name, that appears is accepted by Java's java.time.ZoneId. |
date-partition | None | The date partition the data is going to be written to. Should be a date string given in the format YYYYMMDD. Can be used to overwrite the data of a single partition. Can also be used with different partition types like: HOUR: YYYYMMDDHH MONTH: YYYYMM YEAR: YYYY |
destination-table-kms-key-name | None | Describes the Cloud KMS encryption key that will be used to protect the destination BigQuery table. The BigQuery Service Account associated with your project requires access to this encryption key. for further Information about using CMEK with BigQuery see here. The table will be encrypted by the key only if it created by the connector. A pre-existing unencrypted table won't be encrypted just by setting this option. |
enable-list-inference | false | Indicates whether to use schema inference specifically when the mode is Parquet. |
http-connect-timeout | 6000 | The timeout in milliseconds to establish a connection with BigQuery. Can be alternatively set in the Spark configuration (spark.conf.set("httpConnectTimeout", ...)) or in Hadoop Configuration (fs.gs.http.connect-timeout). |
http-max-retry | 10 | The maximum number of retries for the low-level HTTP requests to BigQuery. Can be alternatively set in the Spark configuration (spark.conf.set("httpMaxRetry", ...)) or in Hadoop Configuration (fs.gs.http.max.retry). |
intermediate-format | parquet | The format of the data before it is loaded to BigQuery, values can be either "parquet","orc" or "avro". In order to use the Avro format, the spark-avro package must be added in runtime. |
partition-expiration-ms | None | Number of milliseconds for which to keep the storage for partitions in the table. The storage in a partition will have an expiration time of its partition time plus this value. |
partition-field | None | If field is specified together with partition-type , the table is partitioned by this field. The field must be a top-level TIMESTAMP or DATE field. Its mode must be NULLABLE or REQUIRED. If the option is not set for a partitioned table, then the table will be partitioned by pseudo column, referenced via either '_PARTITIONTIME' as TIMESTAMP type, or '_PARTITIONDATE' as DATE type. |
partition-type | None | Supported types are: HOUR, DAY, MONTH, YEAR. This option is mandatory for a target table to be partitioned. (Optional. Defaults to DAY if PartitionField is specified). |
persistent-gcs-bucket | None | The GCS bucket that holds the data before it is loaded to BigQuery. If informed, the data won't be deleted after write data into BigQuery. |
persistent-gcs-path | None | The GCS path that holds the data before it is loaded to BigQuery. Used only with persistent-gcs-bucket. |
proxy-address | None | Address of the proxy server. The proxy must be an HTTP proxy and address should be in the host:port format. Can be alternatively set in the Spark configuration (spark.conf.set(...)) or in Hadoop Configuration (fs.gs.proxy.address).(Optional. Required only if connecting to GCP via proxy.) |
proxy-username | None | The userName used to connect to the proxy. Can be alternatively set in the Spark configuration (spark.conf.set(...)) or in Hadoop Configuration (fs.gs.proxy.username). |
proxy-password | None | The password used to connect to the proxy. Can be alternatively set in the Spark configuration (spark.conf.set(...)) or in Hadoop Configuration (fs.gs.proxy.password). |
temporary-gcs-bucket | None | The GCS bucket that temporarily holds the data before it is loaded to BigQuery. Required unless set in the Spark configuration (spark.conf.set(...)). |
use-avro-logical-types | false | When loading from Avro (.option("intermediateFormat", "avro") ), BigQuery uses the underlying Avro types instead of the logical types [by default]. Supplying this option converts Avro logical types to their corresponding BigQuery data types. |
Irrespective of the direct or indirect write approach, the BigQuery
persistence needs below arguments from the user:
Parameter Name | Mandatory | Default Value | Description |
---|---|---|---|
table | Yes | None | The name of big query table in the format [[project:]dataset.]table where dataframe needs to be persisted. |
service-account-credentials-file | No | None | The filepath of the GCP service account credentials. |
dataset | No | None | The dataset containing the table. If you are providing fully qualified name in table parameter, then you can ignore this option. |
project | No | None | The Google Cloud Project ID of the table. (Optional. Defaults to the project of the Service Account being used) |
parent-project | No | None | The Google Cloud Project ID of the table to bill for the export. (Optional. Defaults to the project of the Service Account being used). |
save-mode | No | ErrorIfExists | This is used to specify the expected behavior of saving a DataFrame to a data source. Expected values are (append, overwrite, errorifexists, ignore) |
writer-type | Yes | None | The instance of direct or indirect big query writer type. |
Also, note that for writing to the BigQuery it is necessary to have below privileges to the user:
Role Name | Purpose |
---|---|
roles/bigquery.dataEditor | Access BigQuery Tables |
roles/bigquery.jobUser | Create and query BigQuery tables |
roles/storage.objectViewer | To list and read GCS contents |
roles/storage.objectCreator | To create folders in GCS |