Spark SQL, ThriftServer, GCS in Kubernetes.

HungWei Chiu
9 min readSep 23, 2023

Introduction

This article documents the installation of Spark SQL and Spark Thrift Server on Kubernetes within a Google Kubernetes Engine (GKE) environment. It outlines the use of Google Cloud Storage (GCS) as the data source for Spark SQL and the communication with Spark SQL through JDBC-compatible applications.

The purpose of this article is not to delve deeply into Spark SQL and Thrift Server but to provide a record of the commands and files used.

The architecture of this article is as follows:

  1. Use GKE as the foundation for the Kubernetes platform.
  2. Deploy Spark Thrift Server within GKE.
  3. Store the data to be queried in GCS.
  4. Communicate with Spark Thrift Server and execute SQL-related queries using Beeline CLI or other JDBC-compatible tools.
  5. Spark Thrift Server receives requests, generates corresponding Spark Executors, retrieves relevant data from GCS, and responds accordingly.

The entire architecture is illustrated in the diagram below.

GCS

To enable Spark to access data on Google Cloud Storage (GCS), we need to create a Service Account and grant it the necessary permissions.

Since Spark does not currently support Workload Identity, the Service Account must generate a corresponding credential.json file in JSON format. This file should then be mounted within Kubernetes to allow Spark Executors to interact with GCS using it.

The process is as follows:

  1. Create a Service Account.
  2. Assign an IAM Role to the Service Account for GCS read and write access.
  3. Generate a corresponding Credential Key.
  4. Write the Credential to Kubernetes.
$ gcloud iam service-accounts --project my-project create spark-example --description="For Spark To Access GCS" --display-name=spark-example
$ gcloud projects add-iam-policy-binding my-project --member="serviceAccount:spark-example@my-project.iam.gserviceaccount.com" --role="roles/storage.admin"
$ gcloud iam service-accounts --project my-project keys create spark_test.json --iam-account=spark-example@my-project.iam.gserviceaccount.com
$ kubectl create secret generic gcs-sa --from-file=gcp-credentials.json=spark_test.json

After executing these steps, you will have a Kubernetes secret named “gcs-sa.” When deploying Spark, make sure to mount this secret within the environment and specify the use of “gcp-credentials.json

Spark

Since Spark Server dynamically creates Spark Executors (Pods), it requires a Kubernetes Service Account to obtain permissions. Below is a YAML configuration based on RBAC rules that prepare the required permissions and assign them to a Service Account named “spark.”

apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: spark-server
rules:
- apiGroups: [""]
resources: ["pods", "persistentvolumeclaims", "configmaps", "services"]
verbs: ["get", "deletecollection", "create", "list", "watch", "delete"]
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: spark
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: spark-rolebinding
subjects:
- kind: ServiceAccount
name: spark
namespace: dev
roleRef:
kind: Role
name: spark-server
apiGroup: rbac.authorization.k8s.io

Once everything is set up, the next step is to prepare a StatefulSet for deploying Spark Thrift Server. The official Spark image currently does not include a Connector for communicating with GCS, so an additional installation of the Google Cloud Storage Connector for Spark is necessary.

There are two approaches:

  1. Rebuild the Spark image to include the required files directly.
  2. Use an Init Container to download the file and share it with the main Spark Container via a Volume, placing it in the /opt/spark/jars directory for initialization during Thrift Server startup.

Here, we demonstrate approach (2) for downloading “hadoop3–2.2.16-shared.jar.” While this method may have lower efficiency, it offers flexibility. If version requirements change, you can simply modify the YAML without the need to rebuild the Container Image.

initContainers:
- name: download-file
image: busybox
command: ["sh", "-c", "if [ ! -e /tmp/gcs-connector-hadoop3-2.2.16-shaded.jar ]; then wget -O /tmp/gcs-connector-hadoop3-2.2.16-shaded.jar https://github.com/GoogleCloudDataproc/hadoop-connectors/releases/download/v2.2.16/gcs-connector-hadoop3-2.2.16-shaded.jar; fi"]
volumeMounts:
- name: data-volume
mountPath: /tmp
volumes:
- name: data-volume
emptyDir: {}
containers:
- name: thrift-server
image: apache/spark:3.4.0
volumeMounts:
- name: data-volume
mountPath: /app/data
command:
- 'bash'
- '-c'
- >-
cp /app/data/gcs-connector-hadoop3-2.2.16-shaded.jar /opt/spark/jars/ &&
/opt/spark/sbin/start-thriftserver.sh
--jars /app/data/gcs-connector-hadoop3-2.2.16-shaded.jar
--packages com.google.cloud.bigdataoss:gcs-connector:hadoop3-2.2.16,org.apache.spark:spark-hadoop-cloud_2.12:3.4.0

Additionally, ensure that the previously mentioned Service Account and GCS-related permissions are integrated into the environment.

serviceAccountName: spark
volumes:
- secret:
secretName: gcs-sa
name: gcs-sa
containers:
- name: thrift-server
image: apache/spark:3.4.0
volumeMounts:
- name: gcs-sa
mountPath: /etc/secrets
readOnly: true
command:
--conf spark.hadoop.fs.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem
--conf spark.hadoop.google.cloud.auth.service.account.enable=true
--conf spark.hadoop.fs.gs.project.id=my-project
--conf spark.hadoop.google.cloud.auth.service.account.json.keyfile=/etc/secrets/gcp-credentials.json

Next, configure settings related to Hive Server. In this example, we deploy a local Hive Metastore, using Kubernetes PVC to store Hive data.

volumeClaimTemplates:
- metadata:
name: spark-data
spec:
accessModes: [ "ReadWriteOnce" ]
resources:
requests:
storage: 100Gi
containers:
- name: thrift-server
image: apache/spark:3.4.0
volumeMounts:
- name: gcs-sa
mountPath: /etc/secrets
readOnly: true
- name: spark-data
mountPath: /opt/spark/work-dir
command:
--hiveconf hive.server2.thrift.port=10000
--hiveconf hive.server2.thrift.bind.port=0.0.0.0
--conf spark.kubernetes.driver.ownPersistentVolumeClaim=true
--conf spark.kubernetes.driver.reusePersistentVolumeClaim=true

Finally, fine-tune some Kubernetes-related parameters. Detailed settings can be found in “Running Spark on Kubernetes.

apiVersion: v1
kind: Service
metadata:
name: spark-thrift-service
spec:
clusterIP: None
selector:
app: spark-thrift-server
ports:
- name: thrift-server-port
protocol: TCP
port: 10000
targetPort: 10000
- protocol: TCP
name: spark-driver-port
port: 7078
targetPort: 7078
- protocol: TCP
name: spark-ui-port
port: 4040
targetPort: 4040
---
containers:
- name: thrift-server
image: apache/spark:3.4.0
command:
--master k8s://https://kubernetes.default.svc.cluster.local:443
--conf spark.dynamicAllocation.enabled=true
--conf spark.kubernetes.container.image=apache/spark:v3.4.0
--conf spark.kubernetes.driver.pod.name=spark-thrift-server-0
--conf spark.kubernetes.executor.request.cores="500m"
--conf spark.kubernetes.executor.request.memory="1g"
--conf spark.kubernetes.executor.secrets.gcs-sa=/etc/secrets
--conf spark.kubernetes.namespace=dev
--conf spark.driver.host=spark-thrift-service
--conf spark.driver.bindAddress=spark-thrift-server-0
--conf spark.driver.port=7078
&& tail -f /opt/spark/logs/spark--org.apache.spark.sql.hive.thriftserver.HiveThriftServer2-1-spark-thrift-server-0.out

The above configurations include:

  • Specifying the Kubernetes API Server location.
  • Mounting the “gcs-sa” secret into the environment at /etc/secrets for Executors.
  • Creating a Service to provide network access for future applications.
  • Using the “dev” namespace.
  • Redirecting Thrift Server logs, which are by default written to files, using “tail.”

After organizing all these configurations, you will have a YAML file.

apiVersion: v1
kind: Service
metadata:
name: spark-thrift-service
spec:
clusterIP: None
selector:
app: spark-thrift-server
ports:
- name: thrift-server-port
protocol: TCP
port: 10000
targetPort: 10000
- protocol: TCP
name: spark-driver-port
port: 7078
targetPort: 7078
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: spark-thrift-server
spec:
serviceName: spark-thrift-service
replicas: 1
selector:
matchLabels:
app: spark-thrift-server
volumeClaimTemplates:
- metadata:
name: spark-data
spec:
accessModes: [ "ReadWriteOnce" ]
resources:
requests:
storage: 100Gi
template:
metadata:
labels:
app: spark-thrift-server
spec:
serviceAccountName: spark
initContainers:
- name: download-file
image: busybox
command: ["sh", "-c", "if [ ! -e /tmp/gcs-connector-hadoop3-2.2.16-shaded.jar ]; then wget -O /tmp/gcs-connector-hadoop3-2.2.16-shaded.jar https://github.com/GoogleCloudDataproc/hadoop-connectors/releases/download/v2.2.16/gcs-connector-hadoop3-2.2.16-shaded.jar; fi"]
volumeMounts:
- name: data-volume
mountPath: /tmp
volumes:
- secret:
secretName: gcs-sa
name: gcs-sa
- name: data-volume
emptyDir: {}
containers:
- name: thrift-server
image: apache/spark:3.4.0
volumeMounts:
- name: gcs-sa
mountPath: /etc/secrets
readOnly: true
- name: data-volume
mountPath: /app/data
- name: spark-data
mountPath: /opt/spark/work-dir
command:
- 'bash'
- '-c'
- >-
cp /app/data/gcs-connector-hadoop3-2.2.16-shaded.jar /opt/spark/jars/ &&
/opt/spark/sbin/start-thriftserver.sh
--master k8s://https://kubernetes.default.svc.cluster.local:443
--jars /app/data/gcs-connector-hadoop3-2.2.16-shaded.jar
--packages com.google.cloud.bigdataoss:gcs-connector:hadoop3-2.2.16,org.apache.spark:spark-hadoop-cloud_2.12:3.4.0
--hiveconf hive.server2.thrift.port=10000
--hiveconf hive.server2.thrift.bind.port=0.0.0.0
--conf spark.dynamicAllocation.enabled=true
--conf spark.kubernetes.container.image=apache/spark:v3.4.0
--conf spark.kubernetes.driver.pod.name=spark-thrift-server-0
--conf spark.kubernetes.driver.ownPersistentVolumeClaim=true
--conf spark.kubernetes.driver.reusePersistentVolumeClaim=true
--conf spark.kubernetes.executor.request.cores="500m"
--conf spark.kubernetes.executor.request.memory="1g"
--conf spark.kubernetes.executor.secrets.gcs-sa=/etc/secrets
--conf spark.kubernetes.namespace=dev
--conf spark.driver.host=spark-thrift-service
--conf spark.driver.bindAddress=spark-thrift-server-0
--conf spark.driver.port=7078
--conf spark.hadoop.fs.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem
--conf spark.hadoop.google.cloud.auth.service.account.enable=true
--conf spark.hadoop.fs.gs.project.id=my-project
--conf spark.hadoop.google.cloud.auth.service.account.json.keyfile=/etc/secrets/gcp-credentials.json&& tail -f /opt/spark/logs/spark--org.apache.spark.sql.hive.thriftserver.HiveThriftServer2-1-spark-thrift-server-0.out
&& tail -f /opt/spark/logs/spark--org.apache.spark.sql.hive.thriftserver.HiveThriftServer2-1-spark-thrift-server-0.out

Deploy this file into the environment to observe the deployment status.

$ kubectl -n dev logs -f spark-thrift-server-0
...
23/09/23 01:58:13 INFO AbstractService: Service:ThriftBinaryCLIService is started.
23/09/23 01:58:13 INFO ThriftCLIService: Starting ThriftBinaryCLIService on port 10000 with 5...500 worker threads
23/09/23 01:58:13 INFO AbstractService: Service:HiveServer2 is started.
23/09/23 01:58:13 INFO HiveThriftServer2: HiveThriftServer2 started
...

When you see “HiveThriftServer2 started,” it indicates that the deployment is complete, and you can proceed with environment verification.

Verification

To verify access, let’s start by creating a GCS (Google Cloud Storage) bucket to store data.

gcloud storage buckets create gs://hungwei_spark_test

Since we’ll be using Spark SQL syntax for operations, the data itself should contain:

  • A database/table structure.
  • A schema.
  • Data.

We will use the beeline CLI to demonstrate two scenarios:

  1. Creating a brand new table and data.
  2. Accessing pre-existing data.

First, create a tunnel using “kubectl port-forward” and then access it.

$ kubectl port-forward -n dev --address 0.0.0.0 svc/spark-thrift-service 10000:10000

Connecting via beeline (or dbeaver) to access pre-existing data.

$ ./beeline -u jdbc:hive2://localhost:10000

Additionally, because we configured “spark.dynamicAllocation.enabled=true” during installation, each execution will dynamically generate Pods for processing. Therefore, the first execution may take some time.

Case 1

The demonstration process is as follows:

  1. Use a GCS folder to create a new database and configure the associated schema.
  2. Write data
  3. Read data

When creating the table, be sure to include the “OPTIONS (path ‘gs://$bucket_name/$folder’)” to specify the GCS path.

0: jdbc:hive2://localhost:10000> CREATE TABLE test (id int, name string) OPTIONS (path 'gs://hungwei_spark_test/case1');    
0: jdbc:hive2://localhost:10000> INSERT INTO TABLE test VALUES (1234, 'test');
0: jdbc:hive2://localhost:10000> INSERT INTO TABLE test VALUES (2345, 'test2');
0: jdbc:hive2://localhost:10000> INSERT INTO TABLE test VALUES (1234, 'test3');
0: jdbc:hive2://localhost:10000> INSERT INTO TABLE test VALUES (1234, 'test3');
0: jdbc:hive2://localhost:10000> INSERT INTO TABLE test VALUES (5678, 'test3');
0: jdbc:hive2://localhost:10000> select * from test where name="test3";
+-------+--------+
| id | name |
+-------+--------+
| 5678 | test3 |
| 1234 | test3 |
| 1234 | test3 |
+-------+--------+
3 rows selected (3.415 seconds)

During the execution of the above commands, you can observe the creation of related Pods.

thrift-jdbc-odbc-server-7e310a8abfc209d1-exec-7   1/1     Running             0             1s
thrift-jdbc-odbc-server-7e310a8abfc209d1-exec-6 0/1 Completed 0 5m28s
thrift-jdbc-odbc-server-7e310a8abfc209d1-exec-6 0/1 Terminating 0 5m28s
thrift-jdbc-odbc-server-7e310a8abfc209d1-exec-6 0/1 Terminating 0 5m30s
thrift-jdbc-odbc-server-7e310a8abfc209d1-exec-6 0/1 Terminating 0 5m30s
thrift-jdbc-odbc-server-7e310a8abfc209d1-exec-6 0/1 Terminating 0 5m30s
thrift-jdbc-odbc-server-7e310a8abfc209d1-exec-7 0/1 Completed 0 65s
thrift-jdbc-odbc-server-7e310a8abfc209d1-exec-7 0/1 Terminating 0 65s
thrift-jdbc-odbc-server-7e310a8abfc209d1-exec-7 0/1 Terminating 0 67s
thrift-jdbc-odbc-server-7e310a8abfc209d1-exec-7 0/1 Terminating 0 67s
thrift-jdbc-odbc-server-7e310a8abfc209d1-exec-7 0/1 Terminating

Check the status within GCS to see some files created to describe the data.

$ gsutil list gs://hungwei_spark_test/case1
gs://hungwei_spark_test/case1/
gs://hungwei_spark_test/case1/part-00000-26846a09-e18e-4d38-8b2a-c7c005e2c7e8-c000
gs://hungwei_spark_test/case1/part-00000-29411642-fd0b-4c1c-bcad-fe6f77adef53-c000
gs://hungwei_spark_test/case1/part-00000-34bc8daf-3944-4156-8ff9-f8d2839f6a9f-c000
gs://hungwei_spark_test/case1/part-00000-7ca852d0-97e1-4c06-a3e1-e329c1bfebde-c000
gs://hungwei_spark_test/case1/part-00000-8a105d8b-f314-4763-ba5e-d82ff00506bf-c000

You can also access the Spark UI by port-forwarding to port 4040 using the same method.

Case 2

In the second example, we access pre-existing data. Therefore, we’ll use the “gsutil” tool to upload data to the “case2” folder within the bucket. There are a total of three files, each with the same format (int, string), as shown below:

...
287, 341feba68a7c515288ba
288, 7db6394632c5ee7c2bc3
289, 5c2fb14de85ac40013bc
290, 831596bd2c3051aa128e
291, 673c62da4b7b0eee8efd
292, 46107b341ed115c03ac2
293, 140fde027a05d316fa95
294, ba0760ff44610f797ad0
...

Upload the files to the “case2” folder using “gsutil.”

$ gsutil cp data*   gs://hungwei_spark_test/case2/
Copying file://data1 [Content-Type=application/octet-stream]...
Copying file://data2 [Content-Type=application/octet-stream]...
Copying file://data3 [Content-Type=application/octet-stream]...
| [3 files][ 2.9 MiB/ 2.9 MiB]
Operation completed over 3 objects/2.9 MiB.

$ gsutil ls gs://hungwei_spark_test/case2/
gs://hungwei_spark_test/case2/data1
gs://hungwei_spark_test/case2/data2
gs://hungwei_spark_test/case2/data3

Once the files are ready, return to the beeline CLI interface. This time, we’ll use “EXTERNAL TABLE” along with other variables to describe the file format, as shown below:

CREATE EXTERNAL TABLE case2 (id int, name string) row format delimited fields terminated by ‘,’ stored as textfile OPTIONS (path ‘gs://hungwei_spark_test/case2’);

0: jdbc:hive2://localhost:10000> CREATE EXTERNAL TABLE case2 (id int, name string) row format delimited fields terminated by ',' stored as textfile OPTIONS (path 'gs://hungwei_spark_test/case2');  
0: jdbc:hive2://localhost:10000> select * from case2;
....
| 111291 | 3b8b2b1eca0561d4ab62 |
| 111292 | 01a20fc8e8f91984e447 |
| 111293 | ecf8d25c0ed6f8576f96 |
| 111294 | 558f78477c1b2151f6e9 |
| 111295 | b5ae29bda237add37650 |
| 111296 | ea2caeabbf3559a6cdea |
| 111297 | 0d56273274b4012f690f |
| 111298 | 20a25f019018272013fd |
+-------+------------------------+
| id | name |
+-------+------------------------+
| 111299 | dead09f62d571453339e |
| 111300 | 3ab89d041368f1717543 |
+-------+------------------------+
106,902 rows selected (51.873 seconds)
0: jdbc:hive2://localhost:10000> select count(*) from case2;
+-----------+
| count(1) |
+-----------+
| 106902 |
+-----------+
1 row selected (42.869 seconds)

Summary

  • Spark does not natively support GCS and requires the installation of the relevant Connector.
  • Spark does not support Workload Identity and requires the creation of a Service Account with proper permissions and the generation of relevant keys.
  • Spark on Kubernetes offers various parameters for fine-tuning.

--

--

Responses (1)