Early Access: Inserting JSON data to BigQuery from Spark on Dataproc


Hello folks!

We recently received a case letting us know that Dataproc 2.1.1 was unable to write to a BigQuery table with a column of type JSON. Although the BigQuery connector for Spark has had support for JSON columns since 0.28.0, the Dataproc images on the 2.1 line still cannot create tables with JSON columns or write to existing tables with JSON columns.

The customer has graciously granted permission to share the code we developed to allow this operation. So if you are interested in working with JSON column tables on Dataproc 2.1 please continue reading!

Use the following gcloud command to create your single-node dataproc cluster:

IMAGE_VERSION=2.1.1-debian11
REGION=us-west1
ZONE=${REGION}-a
CLUSTER_NAME=pick-a-cluster-name
gcloud dataproc clusters create ${CLUSTER_NAME} \
    --region ${REGION} \
    --zone ${ZONE} \
    --single-node \
    --master-machine-type n1-standard-4 \
    --master-boot-disk-type pd-ssd \
    --master-boot-disk-size 50 \
    --image-version ${IMAGE_VERSION} \
    --max-idle=90m \
    --enable-component-gateway \
    --scopes 'https://www.googleapis.com/auth/cloud-platform'

The following file is the Scala code used to write JSON structured data to a BigQuery table using Spark. The file following this one can be executed from your single-node Dataproc cluster.

Main.scala

import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.{Metadata, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.sql.avro
import org.apache.avro.specific

  val env = "x"
  val my_bucket = "cjac-docker-on-yarn"
  val my_table = "dataset.testavro2"
    val spark = env match {
      case "local" =>
        SparkSession
          .builder()
          .config("temporaryGcsBucket", my_bucket)
          .master("local")
          .appName("isssue_115574")
          .getOrCreate()
      case _ =>
        SparkSession
          .builder()
          .config("temporaryGcsBucket", my_bucket)
          .appName("isssue_115574")
          .getOrCreate()
    }

  // create DF with some data
  val someData = Seq(
    Row("""{"name":"name1", "age": 10 }""", "id1"),
    Row("""{"name":"name2", "age": 20 }""", "id2")
  )
  val schema = StructType(
    Seq(
      StructField("user_age", StringType, true),
      StructField("id", StringType, true)
    )
  )

  val avroFileName = s"gs://${my_bucket}/issue_115574/someData.avro"
  
  val someDF = spark.createDataFrame(spark.sparkContext.parallelize(someData), schema)
  someDF.write.format("avro").mode("overwrite").save(avroFileName)

  val avroDF = spark.read.format("avro").load(avroFileName)
  // set metadata
  val dfJSON = avroDF
    .withColumn("user_age_no_metadata", col("user_age"))
    .withMetadata("user_age", Metadata.fromJson("""{"sqlType":"JSON"}"""))

  dfJSON.show()
  dfJSON.printSchema

  // write to BigQuery
  dfJSON.write.format("bigquery")
    .mode(SaveMode.Overwrite)
    .option("writeMethod", "indirect")
    .option("intermediateFormat", "avro")
    .option("useAvroLogicalTypes", "true")
    .option("table", my_table)
    .save()


repro.sh:

#!/bin/bash

PROJECT_ID=set-yours-here
DATASET_NAME=dataset
TABLE_NAME=testavro2

# We have to remove all of the existing spark bigquery jars from the local
# filesystem, as we will be using the symbols from the
# spark-3.3-bigquery-0.30.0.jar below.  Having existing jar files on the
# local filesystem will result in those symbols having higher precedence
# than the one loaded with the spark-shell.
sudo find /usr -name 'spark*bigquery*jar' -delete

# Remove the table from the bigquery dataset if it exists
bq rm -f -t $PROJECT_ID:$DATASET_NAME.$TABLE_NAME

# Create the table with a JSON type column
bq mk --table $PROJECT_ID:$DATASET_NAME.$TABLE_NAME \
  user_age:JSON,id:STRING,user_age_no_metadata:STRING

# Load the example Main.scala 
spark-shell -i Main.scala \
  --jars /usr/lib/spark/external/spark-avro.jar,gs://spark-lib/bigquery/spark-3.3-bigquery-0.30.0.jar

# Show the table schema when we use `bq mk --table` and then load the avro
bq query --use_legacy_sql=false \
  "SELECT ddl FROM $DATASET_NAME.INFORMATION_SCHEMA.TABLES where table_name='$TABLE_NAME'"

# Remove the table so that we can see that the table is created should it not exist
bq rm -f -t $PROJECT_ID:$DATASET_NAME.$TABLE_NAME

# Dynamically generate a DataFrame, store it to avro, load that avro,
# and write the avro to BigQuery, creating the table if it does not already exist

spark-shell -i Main.scala \
  --jars /usr/lib/spark/external/spark-avro.jar,gs://spark-lib/bigquery/spark-3.3-bigquery-0.30.0.jar

# Show that the table schema does not differ from one created with a bq mk --table
bq query --use_legacy_sql=false \
  "SELECT ddl FROM $DATASET_NAME.INFORMATION_SCHEMA.TABLES where table_name='$TABLE_NAME'"

Google BigQuery has supported JSON data since October of 2022, but until now, it has not been possible, on generally available Dataproc clusters, to interact with these columns using the Spark BigQuery Connector.

JSON column type support was introduced in spark-bigquery-connector release 0.28.0.


Leave a Reply