What’s the challenge?

Here at Unacast we utilize Cloud Dataflow from Google quite extensively and we have both batch based and streaming pipelines. While the streaming pipelines are started on deploy and streams messages from PubSub, the batching pipelines need to be created and started by some external system. This is typically an application that either has some CRON-like schedule or that listens for changes in a Cloud Storage bucket. I must admit it feels a bit cumbersome to set up a separate app for triggers like this, as the Dataflow code itself is uploaded and run in its entirety in the Cloud.

Wouldn’t it be nice if we could use some Google Cloud hosted service for these triggers as well?

Enter App Engine CRON Service and Cloud Functions

This blog post by Google demonstrates how one can use App Engines CRON functionality to trigger Dataflow periodically or Cloud Functions to start pipelines when a file is uploaded/changed in a Cloud storage bucket. The latter was exactly what I needed for my latest Dataflow project so I sat out to create a POC of this approach. The rest of this post is a summary of what I discovered.

Prerequisites

The setup consists of three moving parts:

  • A self-executable Dataflow pipeline jar file.
  • A Cloud Storage bucket that where files get added periodically by a partner of ours.
  • A Cloud Function configured to run each time something changes in that bucket.

I had already written the actual Dataflow pipeline code in Clojure using Datasplash and I’ll refer to that as pipeline.jar in the code examples later on. Since I already was in Clojure mode with the pipeline code I decided to try writing the Cloud Function in Clojurescript instead of vanilla Javascript. My colleague Martin had already proven that you can write such functions in several different languages that compiles to Javascript, including Clojurescript.

Generating index.js and package.json

Since Clojurescript is a language that compiles to Javascript we’ll have to start with setting up the tools to do the code generation. Here’s a simplification of how my Leiningen project.clj file looks.

Now I can run lein do npm install, cljsbuild once to install my npm dependencies, generate a package.json and compile the index.js file.

What ends up in the index.js file is defined in some Clojurescript that looks like this.

It’s a bit of a mouthful, but the main takeaways are:

  • The pipeline-example-trigger function has to be exported to act as the entry point used by GCF.
  • The incoming raw-event that the function receives when a something happens in the bucket gets parsed and the name and the bucket fields are extracted.
  • The execute-jar-file function uses node-jre to spawn a java process that starts the pipeline-standalone.jar executable with the necessary arguments.
  • The -> proc parts are there to handle logging and events from the child process and making sure the Cloud Function callback gets called when the java process is done submitting the Dataflow pipeline.

Deploying the function to GCF

Now we’re ready to deploy the function to see it in action. I use the Google Cloud CLI to deploy and start the function on GCF, like this:

gcloud beta functions deploy pipeline-example-trigger \
    --local-path target/pipeline-example \
    --stage-bucket [STAGE BUCKET] \
    --trigger-bucket [BUCKET TO WATCH]

The --local-path and --stage-bucket arguments are the locations the source of the functions should be copied from and to. --trigger-bucket is the GCS bucket this function should be watching for changes.

The proof is in the pudding

Running function

The function is up and running, and as you can see on the right of the graph there it received an event when I used the CLI to simulate a file change in the bucket it is watching.

gcloud beta functions call pipeline-example-trigger \
--data '{"name": "file-name", "bucket": "bucket-name"}'

And the logging works as expected.

Log

Closing thoughts

Google Cloud Functions had just entered beta state when this post was written, so understandably it has some rough edges and some missing features. E.g. I’d like a way of providing my own zip file to deploy instead of having to point to a directory with sources because it’d make using a CI/CD service like CircleCI a lot easier. Being able to write functions in a JVM language instead of just Javascript would also make the exercise I’ve done here, albeit fun and interesting, unnecessary.

The turnaround for deploying a function to test it is a bit long, especially when you include a quite large jar file like here. Luckily Google has created an emulator that allows us to test the functions locally before deploying them to the cloud. The fact that I developed this in Clojurescript, that has a great REPL for interactive development, also limited the need for actually deploying the code to test it.

My prediction is that we here at Unacast are going to use Google Cloud Functions to simplify and get rid of a lot of complex code in the months to come. Exciting times ahead!

You’ve already met Lars. Let me introduce Jarle Fosen, also one of our recent hires.

Jarle works as a Platform Engineer at Unacast whilst fighting his way to the top as the most funny person in all of Unacast - constantly. He believes, as he has been told by many, that quantity goes over quality when it comes to telling jokes and funny stories. He might not hit gold every time, but once in a while people laugh.

Jarle

Jarle, who are you?

My name is Jarle, and I’m 25 years old. Born and raised in Karmøy mainland (important distinction), where it usually rains and we have summer a couple of day a year. The rest of the year we have autumn. I’ve been living in Oslo for the past 5 and a half years, where I have been studying Robotics and Artificial Intelligence at the University of Oslo and worked at a startup. In the last 6 months I’ve been having the time of my life with my colleagues and friends at Unacast.

I like fixing everything. Broken printer? Faulty WIFI? I’m on it! Can I automate anything on the way, or create a better experience for you than you already had then I will do it.

In my spare time I probably watch a bit too much TV, and I’m really good at not working out.

What did you do before you started at Unacast

I studied at the University of Oslo and worked at a startup where I was the main developer for an Android application.

Why did you start working at Unacast?

I knew who some of the people at Unacast were from before, and when I got a call to grab a cup of coffee, I immediately knew that these were the people that I wanted to work with. I had a good feeling that what I could bring to the company would have value, but more importantly that I would be able to learn so much from my colleagues while having a lot of laughs.

To become the best at something is my dream. What it is I don’t know yet, but I was certain that this would be a step in the right direction if I were to reach that dream.

What kind of problems do you solve, and what are the tools you use?

Different problems needs different tools. I feel like a potato, you could pretty much use me everywhere.

What I find very fascinating is to create data pipelines using Google’s Pub/Sub together with Dataflow. Being able to boil down transformations into clear and precise functions that do specific things, doing some magic and suddenly at the other end of the pipeline you have the data that you’re looking for! Completely automated, always doing what you want, always working.

My main strength is Java which comes from working with Android app development and creating backend services in Play! Framework. After joining Unacast I’ve been exposed to Clojure and Golang, and I can see myself creating the future in any of those programming languages.

But to solve any problem you need an editor, and for that I use the Swiss Army Knife called Emacs.

What is the most fun thing about working at Unacast?

Our humor. Definitely.

There hasn’t been a single day at work where I haven’t laughed, and I don’t think that day will come. We can go from heated passionate discussions in one moment and switch over to telling jokes and stories when we’ve reached an end in the discussion. I think it’s healthy, and I very much enjoy it.

What will you gain from working at Unacast?

Huge amounts of kowledge in development, system architecture and problem solving. I will also learn how it is to work in a small company that grows to a huge one, and how we all together build and retain the company culture that we are proud of having today when we are 20 people and in the near future when we are 200.

It’s an exciting time to be a part of Unacast!

Introduction

There are two main criteria that one should look for when choosing a database migration framework; it should be simple and it should always roll forward. When we began looking for a migration framework to use in one of our Go applications we could not find one that was simple in the sense that it could read from file as well as from pre-compiled assets, which we tend to use at Unacast, and only rolls forward. As a result of that we ended up implementing our own simple migration framework for Go applications, which I will guide you through in this post.

TLDR; Grab the code on github

Why you should not roll back a database

For some it might sound a little bit odd to say that you should never roll back a database, but let us think about what a rollback is for a moment. While you think about that I will define what I mean rollback and rollforward are.

  • Rollforward: an action in which you take your database from one state to another
  • Rollback: an action in which you take your database from one state to another

The observant reader will notice that the definition for both actions are quite similar, or wait, they are the same. This means that instead of having a rollback you can just write a new rollforward if you are in a situation where a database update has gone wrong.

Another reason is that database roll backs are complicated and there are a lot of things that you need to consider, which you most likely will not, when you are writing a rollback. Let me show an example of something that most people would do when writing a new migration adding a new column:

  1. Write migrate up that adds column X
  2. Write migrate down that removes column X
  3. Runs migration

So far so good. If step 3 fails you can just rollback right away, no stress, but this could also be solved by using a transaction when you run the migration. A more complicated scenario, and probably more realistic one as well, is that migration went well but a couple of hours later the database starts to act weird. What do you do? Should you run the rollback? Probably not. At this point in time you have already gotten some data in the new column X which you most likely want to keep, so the solution for rolling back is not just to remove what you just added. Instead you should write a new migration where you define what you should do to not lose any data. If you would have run the rollback at this point in time, when you have data in column X, you would lose that data. This means that rollbacks are only good during the migration process inside a transaction, but at that point you could as well just use the transaction as rollback mechanism.

Not everyone agrees with always rolling forward, and that is ok, but this is the way we at Unacast think of migrations.

Why could not we just use an existing migration framework

In regard of the roll back issue we could most likely have used an existing framework but not use the roll back feature. The main reason to why we wrote our own framework was that none of the frameworks we could find supported reading migration scripts from assets. In most of our Go applications we compile everything to one file, using go-bindata, meaning that sql files will have a pre-compile step where we generate Go files from the content in the sql scripts. This together with our opinionated view of migrations made us write a new migration framework.

Implementing a migration framework

Before implementing something, it is always good to think about what you want to achieve. The minimum list of features for this project was:

  • Run migrations from assets
  • Run migrations from files
  • Run all migrations, that has not been run before, in one transaction
  • It should use the existing sql.DB package
  • Only roll forward

That does not sound too hard, and it is not as we will see. The last point actually makes things a little bit easier since we are leaving out one feature, rolling back, compared to most other migrations framework. When we know the set of features we need to define the main flow and it turns out it is quite simple:

  1. Verify a migrations table exist, this is needed to keep track of which migrations that has been executed
  2. Get all migrations that has been executed
  3. Get all migrations
  4. Start transaction
  5. Loop over and execute all migrations, ignore those that are already executed
  6. Commit transaction if everything is ok, otherwise roll back transaction (note that this is not roll back of the migration, just a roll back of the transaction)

Now we know everything we need to know to implement the migrations framework. I will not cover all the code, which you can find on github, but there is one thing that I would like to cover. If you look at the signature of the Migrate function it looks like this:

func (migrator *Migrator) Migrate(getFiles GetFiles, getContent GetContent) 

where GetFiles and GetContent has the following signature:

type GetFiles func() []string
type GetContent func(string) string

The rationale behind this approach, instead of giving a folder path where all the files are, is that we can take any function as parameter to the Migrate function that returns a list of strings pointing to where the actual content is and then use the second function to get that content. It also makes it very flexible since the migrations framework is agnostic to where and how the actual content is stored.

When writing your migrations, you will implement a function that has the signature of GetFiles and it will most likely do one of these two things:

  • Return a list of files in a folder or folder tree
  • Return a list of keys that you can use against some map to get content

What your function that implements GetContent should do depends on how you decide you want to use the framework. If reading directly from files the input to GetContent should be file paths, that you get from your GetFiles function, and then your GetContent function just returns the content of those files. If you are using assets your GetContent function should read from the asset framework instead of from disk.

Using the framework

We are still missing a little bit of documentation, but it should be enough to get you started in the readme. Running the migration from inside your application is as simple as:

func runMigrations() {
    db, _ := connectToDb() // should return *sql.DB
    getFiles := func() []string {
        files, _ := assets.AssetDir("migrations/sql")
        return files
    }
    getContent := func(file string) string {
        bytes, _ := assets.Asset(fmt.Sprintf("migrations/sql/%s", file))
        return string(bytes)
    }
    migrator := migrations.New(db)

    migrator.Migrate(getFiles, getContent)
    db.Close()
}

In the example above we are using assets, which have been generated from sql files. The getFiles function returns a list of “file names” in the asset folder migrations/sql. The getContent function will get the output from getFiles as input and will just read the actual asset on each request. With those two defined we can now call Migrate.

Please try it out and let us know what you think. If you have any problems just register a github issue or (even better) send us a PR.

In this blog post, we’d like to present one of our recent hires, Lars Bakke Krogvig. Lars works at Unacast as a Data Engineer and his main responsibility is to make sure the data we receive from our partners flows steadily through our processing pipelines. In his spare time, he’s a road cyclist, and almost as passionate about speed-solving Rubik’s cube as he is passionate about freestyle skiing.

Lars

Lars, who are you?

I’m Lars, 27 years old, born and raised in Oslo. I have a Master’s Degree in Applied Mathematics from NTNU in Trondheim. I used to be pretty serious about twin-tip skiing but never got seriously injured. Now I’m more into road cycling, and yes, I suppose I occasionally maybe devote some time to improve at solving Rubik’s cubes. I also like programming, and lots of other things!

What did you do before you started at Unacast

I worked as an Information Management Consultant in the private sector, where I designed, built and maintained reporting systems.

Why did you start working at Unacast?

My main motivation was that I wanted to roll up my sleeves and work more hands-on with design of information flow and implementation of processing pipelines. As a consultant I felt distanced from the problems I solved, I was more of a helping hand than a stakeholder. I wanted to be more involved.

I also want to become better at programming, and so I felt that it was a good idea for me to do more of that in my work (and perhaps slightly less in my spare time).

Another important factor for me was the opportunity to work with emerging technologies, which I believe are more fun and gives me a good skill set for the future.

What kind of problems do you solve, and what are the tools you use?

My main tasks are to rationalize the logic and sequence of what we do with data in Unacast and place that into a structured processing framework.

When working on data problems my job is to figure out where we are and where we want to go, and then to structure the steps in between and make it possible for everyone throughout the company to grasp what actually happens. I try to make complicated processes understandable for everyone, and have everyone around the table wholeheartedly nod and say “this makes sense”.

Regarding tools and methodology, I usually throw together a proof of concept using BigQuery and SQL, and when the concept is validated I put that into our pipelines with Dataflow.

What is the most fun thing about working at Unacast?

I really like that we have short lead times from an idea on a whiteboard to production, and the flexibility the toolset we use provide us with. It is also very rewarding (and sometimes scary) to see my ideas ending up having a big impact on the business, and having the power to influence the company in this way.

Beyond that I really enjoy working in a relaxed environment with colleagues I can have fun with at work. I get to travel to the US and work with people there, which is a new and rewarding experience for me. Everyone seem to tolerate my dry sense of humor, and even respect me enough to not borrow my markers without asking, which is all anyone can ask for.

What will you gain from working at Unacast?

I see this as a rare opportunity to work with leading technologies and processes are becoming standards internationally, but have not yet gained so much momentum here in Norway. I think this is will be a big advantage for me in the long run.

Right now I benefit from being able to work from wherever I want, which gives me a lot of personal freedom few companies are able to offer. I get to learn a lot and from our skilled developers and draw on their experience, which I think is really nice.

All in all, I get to do the things I want to do in a great environment, and have fun along the way!

I really want to use Dataflow, but Java isn't my 🍵
What to do?

A brief intro to Dataflow

Image credit Google

Dataflow is

A fully-managed cloud service and programming model for batch and streaming big data processing.

from Google. It is a unified programming model (recently open sourced as Apache Beam) and a managed service for creating ETL, streaming and batching jobs. It’s also seamlessly integrated with other Google Cloud services like Cloud Storage, Pub/Sub, Datastore, BigTable, BigQuery. The combination of automatic resource management, auto scaling and the integration with the other Google Cloud

So how do we use it?

Here at Unacast we receive large amounts of data, through both files and our APIs, that we need to filter, convert and pass on to storage in e.g. BigQuery. So being able to create both batch (files) and stream (APIs) based data pipelines using one DSL, running on our existing Google Cloud infrastructure was a big win. As Ken wrote in his post on GCP we try to use it every time we need to process a non-trivial amount of data or we just need to run continuously running worker. Ken also mentioned in that post that we found the Dataflow Java SDK less than ideal for defining data pipelines in code. The piping of transformations (pure functions) felt like something better represented in a proper functional language. We had a brief look at the Scala based SCIO by spotify (which is also donated to Apache Beam btw). It looks promising, but we felt that their DSL diverged too much from the “native” Java/Beam one.

Next on our list was Datasplash, a thin Clojure wrapper around the Java SDK with a Clojuresque approach to the pipeline definitions, using concepts such as (threading), map and filter mixed with regular clojure functions, what’s not to like? So we went with Datasplash and have really enjoyed using it in several of our data pipeline projects. Since the Datsplash source is quite extensible and relatively easy to get a grasp of we even have contributed a few enhancements and bugfixes to the project.

And in the blue corner, Datasplash!

It’s time to see of how Datasplash performs in the ring, and to showcase that I’ve chosen to reimplement the StreamingWordExtract example from the Dataflow documentation. A Dataflow-off, so to speak.

The example pipeline reads lines of text from a PubSub topic, splits each line into individual words, capitalizes those words, and writes the output to a BigQuery table

Here’s how it the code looks in it’s entirety, and I’ll talk about some of the highlights specifically about the the pipeline composition bellow.

First we have to create a pipeline instance, and it can in theory be use several times to create parallel pipelines.

Then we apply the different transformation functions with the pipeline as the first argument. Notice that the pipeline has to be run in a separate step, passing the pipeline instance as an argument. This isn’t very functional, but it’s because of the underlying Java SDK.

Inside apply-transforms-to-pipeline we utilize the Threading Macro to start passing the pipeline as the last argument to the read-from-pubsub transformation. The Threading Macro will then pass the result of that transformation as the last argument of the next one, and so on and so forth.

Here we see the actual processing of the data. For each message from PubSub we extract words (and flatten those lists with mapcat), uppercase each word and add them to a simple row json object. Notice the different ways we pass functions to map/mapcat.

Last, but not least we write the results as separate rows to the given BigQuery table.

And that’s it really! No to apply a simple, pure function.

Here’s a quick look at the graphical representation of the pipeline in the Dataflow UI.

This is the Dataflow UI view of the pipeline. 27.770 words have been added to BigQuery

Conclusion

To summarize I’ll say that the experience of building Dataflow pipelines in Clojure using Datasplash has been a pleasant and exciting experience. I would like to emphasize a couple of things I think have turned out to be extra valuable.

  • The code is mostly known Clojure constructs, and the Datasplash specific code try to use the same semantics. Like ds/map and ds/filter.
  • Having a REPL at hand in the editor to test small snippets and function is very underestimated, I’ve found my self using it all the time.
  • Setting up aliases to run different pipelines (locally and in the ☁️ ) with different arguments via Leiningen has also been really handy when testing a pipeline during development.
  • The conciseness and overall feeling of “correctness” when working in an immutable, functional LISP has also been something that I’ve come to love even more now that I’ve tried it in a full fledged project.