{"id":1931,"date":"2023-05-13T20:52:10","date_gmt":"2023-05-14T03:52:10","guid":{"rendered":"https:\/\/wp.c9h.org\/cj\/?p=1931"},"modified":"2023-05-13T21:30:19","modified_gmt":"2023-05-14T04:30:19","slug":"early-access-inserting-json-data-to-bigquery-from-spark-on-dataproc","status":"publish","type":"post","link":"https:\/\/wp.c9h.org\/cj\/?p=1931","title":{"rendered":"Early Access: Inserting JSON data to BigQuery from Spark on Dataproc"},"content":{"rendered":"<p>Hello folks!<\/p>\n<p>We recently received a case letting us know that Dataproc 2.1.1 was unable to write to a BigQuery table with a column of type JSON.  Although the BigQuery connector for Spark has had support for JSON columns since 0.28.0, the Dataproc images on the 2.1 line still cannot create tables with JSON columns or write to existing tables with JSON columns.<\/p>\n<p>The customer has graciously granted permission to share the code we developed to allow this operation.  So if you are interested in working with JSON column tables on Dataproc 2.1 please continue reading!<\/p>\n<p>Use the following gcloud command to create your single-node dataproc cluster:<\/p>\n<pre>IMAGE_VERSION=2.1.1-debian11\nREGION=us-west1\nZONE=${REGION}-a\nCLUSTER_NAME=pick-a-cluster-name\ngcloud dataproc clusters create ${CLUSTER_NAME} \\\n    --region ${REGION} \\\n    --zone ${ZONE} \\\n    --single-node \\\n    --master-machine-type n1-standard-4 \\\n    --master-boot-disk-type pd-ssd \\\n    --master-boot-disk-size 50 \\\n    --image-version ${IMAGE_VERSION} \\\n    --max-idle=90m \\\n    --enable-component-gateway \\\n    --scopes 'https:\/\/www.googleapis.com\/auth\/cloud-platform'\n<\/pre>\n<p>The following file is the Scala code used to write JSON structured data to a BigQuery table using Spark.  The file following this one can be executed from your single-node Dataproc cluster.<\/p>\n<p><a href=\"https:\/\/wp.c9h.org\/cj\/wp-content\/uploads\/2023\/05\/Main.scala\">Main.scala<\/a><\/p>\n<pre>import org.apache.spark.sql.functions.col\nimport org.apache.spark.sql.types.{Metadata, StringType, StructField, StructType}\nimport org.apache.spark.sql.{Row, SaveMode, SparkSession}\nimport org.apache.spark.sql.avro\nimport org.apache.avro.specific\n\n  val env = \"x\"\n  val my_bucket = \"cjac-docker-on-yarn\"\n  val my_table = \"dataset.testavro2\"\n    val spark = env match {\n      case \"local\" =&gt;\n        SparkSession\n          .builder()\n          .config(\"temporaryGcsBucket\", my_bucket)\n          .master(\"local\")\n          .appName(\"isssue_115574\")\n          .getOrCreate()\n      case _ =&gt;\n        SparkSession\n          .builder()\n          .config(\"temporaryGcsBucket\", my_bucket)\n          .appName(\"isssue_115574\")\n          .getOrCreate()\n    }\n\n  \/\/ create DF with some data\n  val someData = Seq(\n    Row(\"\"\"{\"name\":\"name1\", \"age\": 10 }\"\"\", \"id1\"),\n    Row(\"\"\"{\"name\":\"name2\", \"age\": 20 }\"\"\", \"id2\")\n  )\n  val schema = StructType(\n    Seq(\n      StructField(\"user_age\", StringType, true),\n      StructField(\"id\", StringType, true)\n    )\n  )\n\n  val avroFileName = s\"gs:\/\/${my_bucket}\/issue_115574\/someData.avro\"\n  \n  val someDF = spark.createDataFrame(spark.sparkContext.parallelize(someData), schema)\n  someDF.write.format(\"avro\").mode(\"overwrite\").save(avroFileName)\n\n  val avroDF = spark.read.format(\"avro\").load(avroFileName)\n  \/\/ set metadata\n  val dfJSON = avroDF\n    .withColumn(\"user_age_no_metadata\", col(\"user_age\"))\n    .withMetadata(\"user_age\", Metadata.fromJson(\"\"\"{\"sqlType\":\"JSON\"}\"\"\"))\n\n  dfJSON.show()\n  dfJSON.printSchema\n\n  \/\/ write to BigQuery\n  dfJSON.write.format(\"bigquery\")\n    .mode(SaveMode.Overwrite)\n    .option(\"writeMethod\", \"indirect\")\n    .option(\"intermediateFormat\", \"avro\")\n    .option(\"useAvroLogicalTypes\", \"true\")\n    .option(\"table\", my_table)\n    .save()\n\n\n<\/pre>\n<p><a href=\"https:\/\/wp.c9h.org\/cj\/wp-content\/uploads\/2023\/05\/repro.sh\">repro.sh<\/a>:<\/p>\n<pre>#!\/bin\/bash\n\nPROJECT_ID=set-yours-here\nDATASET_NAME=dataset\nTABLE_NAME=testavro2\n\n# We have to remove all of the existing spark bigquery jars from the local\n# filesystem, as we will be using the symbols from the\n# spark-3.3-bigquery-0.30.0.jar below.  Having existing jar files on the\n# local filesystem will result in those symbols having higher precedence\n# than the one loaded with the spark-shell.\nsudo find \/usr -name 'spark*bigquery*jar' -delete\n\n# Remove the table from the bigquery dataset if it exists\nbq rm -f -t $PROJECT_ID:$DATASET_NAME.$TABLE_NAME\n\n# Create the table with a JSON type column\nbq mk --table $PROJECT_ID:$DATASET_NAME.$TABLE_NAME \\\n  user_age:JSON,id:STRING,user_age_no_metadata:STRING\n\n# Load the example Main.scala \nspark-shell -i Main.scala \\\n  --jars \/usr\/lib\/spark\/external\/spark-avro.jar,gs:\/\/spark-lib\/bigquery\/spark-3.3-bigquery-0.30.0.jar\n\n# Show the table schema when we use `bq mk --table` and then load the avro\nbq query --use_legacy_sql=false \\\n  \"SELECT ddl FROM $DATASET_NAME.INFORMATION_SCHEMA.TABLES where table_name='$TABLE_NAME'\"\n\n# Remove the table so that we can see that the table is created should it not exist\nbq rm -f -t $PROJECT_ID:$DATASET_NAME.$TABLE_NAME\n\n# Dynamically generate a DataFrame, store it to avro, load that avro,\n# and write the avro to BigQuery, creating the table if it does not already exist\n\nspark-shell -i Main.scala \\\n  --jars \/usr\/lib\/spark\/external\/spark-avro.jar,gs:\/\/spark-lib\/bigquery\/spark-3.3-bigquery-0.30.0.jar\n\n# Show that the table schema does not differ from one created with a bq mk --table\nbq query --use_legacy_sql=false \\\n  \"SELECT ddl FROM $DATASET_NAME.INFORMATION_SCHEMA.TABLES where table_name='$TABLE_NAME'\"\n<\/pre>\n<p>Google BigQuery has supported <a href=\"https:\/\/cloud.google.com\/bigquery\/docs\/json-data\">JSON data<\/a> since <a href=\"https:\/\/cloud.google.com\/bigquery\/docs\/release-notes#October_03_2022\">October of 2022<\/a>, but until now, it has not been possible, on generally available Dataproc clusters, to interact with these columns using the Spark BigQuery Connector.<\/p>\n<p>JSON column type support was introduced in <a href=\"https:\/\/github.com\/GoogleCloudDataproc\/spark-bigquery-connector\/releases\/tag\/0.28.0\">spark-bigquery-connector release 0.28.0<\/a>.<\/p>\n\n<div class=\"twitter-share\"><a href=\"https:\/\/twitter.com\/intent\/tweet?via=cjamescollier\" class=\"twitter-share-button\">Tweet<\/a><\/div>\n","protected":false},"excerpt":{"rendered":"<p>Hello folks! We recently received a case letting us know that Dataproc 2.1.1 was unable to write to a BigQuery table with a column of type JSON. Although the BigQuery connector for Spark has had support for JSON columns since 0.28.0, the Dataproc images on the 2.1 line still cannot create tables with JSON columns [&hellip;]<\/p>\n","protected":false},"author":2,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"jetpack_post_was_ever_published":false,"_jetpack_newsletter_access":"","_jetpack_dont_email_post_to_subs":false,"_jetpack_newsletter_tier_id":0,"_jetpack_memberships_contains_paywalled_content":false,"_jetpack_memberships_contains_paid_content":false,"footnotes":""},"categories":[319,320,318,313,60,165,17,79,316,317,39,204,102,166,20,184,8],"tags":[],"class_list":["post-1931","post","type-post","status-publish","format-standard","hentry","category-apache-hadoop","category-apache-spark","category-bigquery","category-bullseye","category-colliertech","category-databases","category-debian","category-free-software","category-gcp","category-google-cloud-support","category-java","category-javascript","category-open-source","category-software","category-sql","category-virtualization","category-work"],"jetpack_featured_media_url":"","jetpack_shortlink":"https:\/\/wp.me\/p1YDIB-v9","jetpack_sharing_enabled":true,"jetpack_likes_enabled":true,"_links":{"self":[{"href":"https:\/\/wp.c9h.org\/cj\/index.php?rest_route=\/wp\/v2\/posts\/1931","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/wp.c9h.org\/cj\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/wp.c9h.org\/cj\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/wp.c9h.org\/cj\/index.php?rest_route=\/wp\/v2\/users\/2"}],"replies":[{"embeddable":true,"href":"https:\/\/wp.c9h.org\/cj\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=1931"}],"version-history":[{"count":4,"href":"https:\/\/wp.c9h.org\/cj\/index.php?rest_route=\/wp\/v2\/posts\/1931\/revisions"}],"predecessor-version":[{"id":1940,"href":"https:\/\/wp.c9h.org\/cj\/index.php?rest_route=\/wp\/v2\/posts\/1931\/revisions\/1940"}],"wp:attachment":[{"href":"https:\/\/wp.c9h.org\/cj\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=1931"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/wp.c9h.org\/cj\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=1931"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/wp.c9h.org\/cj\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=1931"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}