Orchestrating Dataproc jobs using Google Cloud Workflows

Benjamin B Beeshma
Searce
Published in
6 min readApr 12, 2024

--

Google Cloud Workflows is a fully managed orchestration service that helps you automate and manage complex business processes and workflows. It allows you to define and execute a series of tasks or steps in a reliable and scalable manner.

Compared to dedicated workflow orchestration tools, Cloud Workflows currently offers a more limited feature set. For instance, Cloud Workflows lacks features like complex decision logic (beyond basic branching) or retries based on specific error conditions.

While building complex data pipelines that depend on executing a series of interdependent dataproc jobs, the orchestration framework needs to be able to trigger the jobs, monitor its execution, and then also have the capability to send alerts when something goes wrong.

Unfortunately, there are no ready-to-use modules available to handle this task. We need to build a custom logic using the basic functionality available.

Solution Overview

The process involves creating a main workflow that triggers a sub-workflow responsible for submitting and monitoring Dataproc jobs.

The main workflow passes necessary parameters such as project ID, region, cluster name, script URI, job arguments, job ID, job properties, and callback wait time to the sub-workflow.

The sub-workflow then submits the job to Dataproc, monitors its execution, and provides feedback on job success or failure.

Workflow Architecture

Parent Workflow

The parent workflow serves as the entry point and orchestrates the execution of Dataproc jobs. It receives input parameters and triggers the sub-workflow responsible for job submission and monitoring.

In case of job failure or success, appropriate actions are taken, such as sending alerts or returning status information.

Sub-Workflow

The sub-workflow is responsible for submitting Dataproc jobs and monitoring their execution. It utilizes HTTP requests to interact with the Dataproc API for job submission and status monitoring.

The workflow includes error handling mechanisms to handle exceptions and retries, ensuring robustness in job execution.

Subworkflow Architecture
Sub-Workflow Architecture
Visualization of the Sub-Workflow — part 1
Visualization of the Sub-Workflow — part 2

Sub-Workflow Breakdown

The provided sub-workflow consists of several key steps:

1. Set Variables

This step gathers user-defined inputs like region, cluster name, job script URI, arguments, and project ID from the parent workflow.

2. Trigger Dataproc Job

This step constructs the Dataproc job submission request using the gathered information.

It utilizes an HTTP POST request to the Dataproc Jobs API (jobs:submit) to initiate the PySpark job on the specified cluster.

The request includes details like the project ID, cluster name, main script file URI, job arguments, and properties.

OAuth2 authentication with Cloud Platform scope is used for authorization.

In case of exceptions during the trigger, an alert is sent with the error message.

3. Check Trigger Status

The sub-workflow checks the response code of the Dataproc job submission (jobTriggerResult).

If successful (code 200), it proceeds to wait for job completion. Otherwise, it jumps to the job trigger failed handling.

4. Wait for Job Completion (if triggered successfully)

This section continuously checks the job status using HTTP GET requests to the Dataproc Jobs API (jobs/{job_id}).

It implements a retry mechanism with exponential backoff in case of temporary failures.

Based on the job status response:

  • If the job finishes successfully (state is “DONE”), the workflow exits with the job status.
  • If the job encounters errors (state is “ERROR”) or gets canceled, it jumps to the job execution failed handling.
  • If the job is still running, it waits for a specified duration (callback_wait_secs) before re-checking the status.

5. Job Trigger Failed Handling

In case the job submission fails, this section sends an alert with the error message using a user-provided Pub/Sub topic.

The workflow exits with the job trigger failure details.

6. Job Execution Failed Handling

If the job execution encounters errors or gets canceled, this section sends an alert with the job status details using a Pub/Sub topic.

The workflow exits with the job execution failure details.

7. Job Executed Successfully

If the job finishes successfully, the workflow exits with the final job status.

8. Send PubSub Alert (separate function)

This separate function is used for sending alerts to a Pub/Sub topic.

It takes the input parameters (including the message and Pub/Sub topic name) and project ID from the environment variable.

It constructs the message payload and publishes it to the specified Pub/Sub topic.

Sub-Workflow Script

main:
params: [input]
steps:
- set_variables: # 1. Set Variables
assign:
- project_id: '${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}'
- region: ${input.region}
- cluster_name: ${input.cluster_name}
- script_uri: ${input.script_uri}
- job_args: ${input.job_args}
- job_id: ${input.job_id}
- job_properties: ${input.job_properties}
- callback_wait_secs: ${input.callback_wait_secs}
- dataproc_api: ${"https://dataproc.googleapis.com/v1/projects/" + project_id + "/regions/" + region + "/"}
- triggerDataprocPysparkJob: # 2. Trigger Dataproc Job
try:
call: http.post
args:
url: ${dataproc_api + "jobs:submit"}
body:
"projectId": ${project_id}
"job": {
"placement": {
"clusterName": '${cluster_name}'
},
"reference": {
"projectId": '${project_id}',
"jobId": '${job_id + "_" + uuid.generate()}'
},
"pysparkJob": {
"mainPythonFileUri": '${script_uri}',
"args": '${job_args}',
"properties": '${job_properties}'
}
}
auth:
type: OAuth2
scopes: https://www.googleapis.com/auth/cloud-platform
result : jobTriggerResult
except:
as: e
steps:
- exception_occured_alert:
call: send_pubsub_alert
args:
input_params: ${input}
message: ${e}
result: pubsub_result
- exception_occured_out:
return: ${e}
- check_trigger_status: # 3. Check Trigger Status
switch:
- condition: ${jobTriggerResult.code == 200}
next: waitForJobCompletion
next: job_trigger_failed
- waitForJobCompletion: # 4. Wait for Job Completion
steps:
- checkJob:
try:
call: http.get
args:
url: ${dataproc_api + "jobs/" + jobTriggerResult.body.reference.jobId}
auth:
type: OAuth2
result: jobStatus
retry:
predicate: ${http.default_retry_predicate}
max_retries: 3
backoff:
initial_delay: 2
multiplier: 2
- checkIfDone:
switch:
- condition: ${jobStatus.body.status.state == "DONE"}
next: job_executed_succesfully
- condition: ${jobStatus.body.status.state == "ERROR" or jobStatus.body.status.state == "CANCELLED"}
next: job_excecution_failed
- wait:
call: sys.sleep
args:
seconds: ${callback_wait_secs}
next: checkJob

- job_trigger_failed: # 5. Job Trigger Failed Handling
steps:
- send_trigger_failed_alert:
call: send_pubsub_alert
args:
input_params: ${input}
message: ${jobTriggerResult}
result: pubsub_result
- job_trigger_failed_out:
return: ${jobTriggerResult}

- job_excecution_failed: # 6. Job Execution Failed Handling
steps:
- send_failure_alert:
call: send_pubsub_alert
args:
input_params: ${input}
message: ${jobStatus}
result: pubsub_result
- job_excecution_failed_out:
return: ${jobStatus}

- job_executed_succesfully: # 7. Job Executed Successfully
return: ${jobStatus}

send_pubsub_alert: # 8. Send PubSub Alert
params: [input_params, message]
steps:
- init:
assign:
- project_id: '${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}'
- pubsub_alert_topic: ${input_params.pubsub_alert_topic}
- message.parent_workflow: ${input_params.parent_workflow}
- message.job_id: ${input_params.job_id}
- send_alert:
call: googleapis.pubsub.v1.projects.topics.publish
args:
topic: '${"projects/" + project_id + "/topics/" + pubsub_alert_topic}'
body:
messages:
- data: '${base64.encode(json.encode(message))}'
result: output
- return_out:
return: ${output}

Integrating the Sub-Workflow

This sub-workflow can be incorporated into your main workflow using the call step. The sub-workflow takes various parameters as input, including region, cluster name, job script URI, arguments, properties, Pub/Sub topic for alerts, and callback waiting time.

To use this workflow, you need to provide the following input parameters:

  • region: The Google Cloud region where the Dataproc cluster is located.
  • cluster_name: The name of the Dataproc cluster.
  • script_uri: The Google Cloud Storage URI of the main Python script to be executed.
  • job_args (optional): A list of arguments to be passed to the PySpark job.
  • job_id: A unique identifier for the Dataproc job.
  • job_properties (optional): A dictionary of properties to be applied to the PySpark job.
  • callback_wait_secs: The number of seconds to wait between job status checks.
  • pubsub_alert_topic: The name of the Google Cloud Pub/Sub topic to publish alert messages.
  • parent_workflow (optional): The name of the parent workflow that triggered the Dataproc job.

To use this sub-workflow in your parent workflow, you can call it with the required input parameters. For example:

- execute_job:
call: googleapis.workflowexecutions.v1.projects.locations.workflows.executions.run
args:
workflow_id: ${subworkflow_id}
argument: {
"job_id": '${job_id}',
"script_uri": '${script_uri}',
"job_args": '${job_args}',
"job_properties": '${job_properties}',
"parent_workflow": '${workflow_id}',
"pubsub_alert_topic": '${pubsub_alert_topic}',
"callback_wait_secs": '${callback_wait_secs}',
"cluster_name": '${cluster_name}',
"region": '${region}'
}
connector_params:
timeout: 7000
result: execution_result

Error Handling

The workflow includes robust error handling mechanisms to address potential failures during job submission and execution. Exceptions are caught and appropriate actions are taken, such as sending alerts and retrying operations.

Alerting Mechanism

In case of job submission failure or execution error, the workflow triggers an alerting mechanism to notify relevant stakeholders. Alerts are sent via Google Cloud Pub/Sub, providing details about the failed job and any relevant context.

Conclusion

By following the outlined steps and best practices, you can efficiently orchestrate and monitor your Dataproc pipelines using Google Cloud Workflows, ensuring reliable and scalable job executions.

--

--