Trigger Dataflow pipelines with Cloud Functions written in Clojurescript

Torbjørn Vatn
unacastlabs
Published in
4 min readMar 10, 2017

--

Dataflow

What’s the challenge?

Here at Unacast we utilize Cloud Dataflow from Google quite extensively and we have both batch based and streaming pipelines. While the streaming pipelines are started on deploy and streams messages from PubSub, the batching pipelines need to be created and started by some external system. This is typically an application that either has some CRON-like schedule or that listens for changes in a Cloud Storage bucket. I must admit it feels a bit cumbersome to set up a separate app for triggers like this, as the Dataflow code itself is uploaded and run in its entirety in the Cloud.?

Wouldn’t it be nice if we could use some Google Cloud hosted service for these triggers as well?

Enter App Engine CRON Service and Cloud Functions

This blog post by Google demonstrates how one can use App Engines CRON functionality to trigger Dataflow periodically or Cloud Functions to start pipelines when a file is uploaded/changed in a Cloud storage bucket. The latter was exactly what I needed for my latest Dataflow project so I sat out to create a POC of this approach. The rest of this post is a summary of what I discovered.

Prerequisites

The setup consists of three moving parts:

  • A self-executable Dataflow pipeline jar file.
  • A Cloud Storage bucket that where files get added periodically by a partner of ours.
  • A Cloud Function configured to run each time something changes in that bucket.

I had already written the actual Dataflow pipeline code in Clojure using Datasplash and I’ll refer to that as pipeline.jar in the code examples later on. Since I already was in Clojure mode with the pipeline code I decided to try writing the Cloud Function in Clojurescript instead of vanilla Javascript. My colleague Martin had already proven that you can write such functions in several different languages that compiles to Javascript, including Clojurescript.

Generating index.js and package.json

Since Clojurescript is a language that compiles to Javascript we’ll have to start with setting up the tools to do the code generation. Here’s a simplification of how my Leiningen project.clj file looks:

Now I can run lein do npm install, cljsbuild once to install my npm dependencies, generate a package.json and compile the index.js file.

What ends up in the index.js file is defined in some Clojurescript that looks like this:

It’s a bit of a mouthful, but the main takeaways are:

  • The pipeline-example-trigger function has to be exported to act as the entry point used by GCF.
  • The incoming raw-event that the function receives when a something happens in the bucket gets parsed and the name and the bucket fields are extracted.
  • The execute-jar-file function uses node-jre to spawn a java process that starts the pipeline-standalone.jar executable with the necessary arguments.
  • The -> proc parts are there to handle logging and events from the child process and making sure the Cloud Function callback gets called when the java process is done submitting the Dataflow pipeline.

Deploying the function to GCF

Now we’re ready to deploy the function to see it in action. I use the Google Cloud CLI to deploy and start the function on GCF, like this:

gcloud beta functions deploy pipeline-example-trigger \ --local-path target/pipeline-example \ --stage-bucket [STAGE BUCKET] \ --trigger-bucket [BUCKET TO WATCH]

The --local-path and --stage-bucket arguments are the locations the source of the functions should be copied from and to. --trigger-bucket is the GCS bucket this function should be watching for changes.

The proof is in the pudding

The function is up and running, and as you can see on the right of the graph there it received an event when I used the CLI to simulate a file change in the bucket it is watching.

gcloud beta functions call pipeline-example-trigger \ --data '{"name": "file-name", "bucket": "bucket-name"}'

And the logging works as expected.

Closing thoughts

Google Cloud Functions had just entered beta state when this post was written, so understandably it has some rough edges and some missing features. E.g. I’d like a way of providing my own zip file to deploy instead of having to point to a directory with sources because it’d make using a CI/CD service like CircleCI a lot easier. Being able to write functions in a JVM language instead of just Javascript would also make the exercise I’ve done here, albeit fun and interesting, unnecessary.

The turnaround for deploying a function to test it is a bit long, especially when you include a quite large jar file like here. Luckily Google has created an emulator that allows us to test the functions locally before deploying them to the cloud. The fact that I developed this in Clojurescript, that has a great REPL for interactive development, also limited the need for actually deploying the code to test it.

My prediction is that we here at Unacast are going to use Google Cloud Functions to simplify and get rid of a lot of complex code in the months to come. Exciting times ahead!

--

--