Hello folks!
We recently received a case letting us know that Dataproc 2.1.1 was unable to write to a BigQuery table with a column of type JSON. Although the BigQuery connector for Spark has had support for JSON columns since 0.28.0, the Dataproc images on the 2.1 line still cannot create tables with JSON columns or write to existing tables with JSON columns.
The customer has graciously granted permission to share the code we developed to allow this operation. So if you are interested in working with JSON column tables on Dataproc 2.1 please continue reading!
Use the following gcloud command to create your single-node dataproc cluster:
IMAGE_VERSION=2.1.1-debian11 REGION=us-west1 ZONE=${REGION}-a CLUSTER_NAME=pick-a-cluster-name gcloud dataproc clusters create ${CLUSTER_NAME} \ --region ${REGION} \ --zone ${ZONE} \ --single-node \ --master-machine-type n1-standard-4 \ --master-boot-disk-type pd-ssd \ --master-boot-disk-size 50 \ --image-version ${IMAGE_VERSION} \ --max-idle=90m \ --enable-component-gateway \ --scopes 'https://www.googleapis.com/auth/cloud-platform'
The following file is the Scala code used to write JSON structured data to a BigQuery table using Spark. The file following this one can be executed from your single-node Dataproc cluster.
import org.apache.spark.sql.functions.col import org.apache.spark.sql.types.{Metadata, StringType, StructField, StructType} import org.apache.spark.sql.{Row, SaveMode, SparkSession} import org.apache.spark.sql.avro import org.apache.avro.specific val env = "x" val my_bucket = "cjac-docker-on-yarn" val my_table = "dataset.testavro2" val spark = env match { case "local" => SparkSession .builder() .config("temporaryGcsBucket", my_bucket) .master("local") .appName("isssue_115574") .getOrCreate() case _ => SparkSession .builder() .config("temporaryGcsBucket", my_bucket) .appName("isssue_115574") .getOrCreate() } // create DF with some data val someData = Seq( Row("""{"name":"name1", "age": 10 }""", "id1"), Row("""{"name":"name2", "age": 20 }""", "id2") ) val schema = StructType( Seq( StructField("user_age", StringType, true), StructField("id", StringType, true) ) ) val avroFileName = s"gs://${my_bucket}/issue_115574/someData.avro" val someDF = spark.createDataFrame(spark.sparkContext.parallelize(someData), schema) someDF.write.format("avro").mode("overwrite").save(avroFileName) val avroDF = spark.read.format("avro").load(avroFileName) // set metadata val dfJSON = avroDF .withColumn("user_age_no_metadata", col("user_age")) .withMetadata("user_age", Metadata.fromJson("""{"sqlType":"JSON"}""")) dfJSON.show() dfJSON.printSchema // write to BigQuery dfJSON.write.format("bigquery") .mode(SaveMode.Overwrite) .option("writeMethod", "indirect") .option("intermediateFormat", "avro") .option("useAvroLogicalTypes", "true") .option("table", my_table) .save()
#!/bin/bash PROJECT_ID=set-yours-here DATASET_NAME=dataset TABLE_NAME=testavro2 # We have to remove all of the existing spark bigquery jars from the local # filesystem, as we will be using the symbols from the # spark-3.3-bigquery-0.30.0.jar below. Having existing jar files on the # local filesystem will result in those symbols having higher precedence # than the one loaded with the spark-shell. sudo find /usr -name 'spark*bigquery*jar' -delete # Remove the table from the bigquery dataset if it exists bq rm -f -t $PROJECT_ID:$DATASET_NAME.$TABLE_NAME # Create the table with a JSON type column bq mk --table $PROJECT_ID:$DATASET_NAME.$TABLE_NAME \ user_age:JSON,id:STRING,user_age_no_metadata:STRING # Load the example Main.scala spark-shell -i Main.scala \ --jars /usr/lib/spark/external/spark-avro.jar,gs://spark-lib/bigquery/spark-3.3-bigquery-0.30.0.jar # Show the table schema when we use `bq mk --table` and then load the avro bq query --use_legacy_sql=false \ "SELECT ddl FROM $DATASET_NAME.INFORMATION_SCHEMA.TABLES where table_name='$TABLE_NAME'" # Remove the table so that we can see that the table is created should it not exist bq rm -f -t $PROJECT_ID:$DATASET_NAME.$TABLE_NAME # Dynamically generate a DataFrame, store it to avro, load that avro, # and write the avro to BigQuery, creating the table if it does not already exist spark-shell -i Main.scala \ --jars /usr/lib/spark/external/spark-avro.jar,gs://spark-lib/bigquery/spark-3.3-bigquery-0.30.0.jar # Show that the table schema does not differ from one created with a bq mk --table bq query --use_legacy_sql=false \ "SELECT ddl FROM $DATASET_NAME.INFORMATION_SCHEMA.TABLES where table_name='$TABLE_NAME'"
Google BigQuery has supported JSON data since October of 2022, but until now, it has not been possible, on generally available Dataproc clusters, to interact with these columns using the Spark BigQuery Connector.
JSON column type support was introduced in spark-bigquery-connector release 0.28.0.