A Cost-effective Approach to Scaling Event-based Data Collection and Analysis

With millions of people now using Wanelo across various platforms, collecting and analyzing user actions and events becomes a pretty fun problem to solve. While in most services user actions generate some aggregated records in database systems and keeping those actions non-aggregated is not explicitly required for the product itself, it is critical for other reasons such as user history, behavioral analytics, spam detection and ad hoc querying.

If we were to split this problem into two sub-problems, they would probably be “data collection" and “data aggregation and analysis."

UPDATE: please checkout the following presentation from Surge2013 Conference for another view into this project:

First question: Why don’t we use our relational database backend for this?

lving these problems is with the use of a relational database. As most people use this successfully up to a point, we can call this a proven approach. Web applications and APIs already have clients to connect to the database backends and can insert new records (data collection solved). The SQL already provides the perfect interface to the relational algebra that could be executed on the data for any purpose (data aggregation and analysis solved).

"Up to a point"?

Using an (open source) relational database is a very cost-effective solution, and can be scaled by optimizing configurations, isolating database servers, and queuing and batching writes. You can certainly buy yourself time on the data collection front with this approach. RDBMSes are highly adaptable for different use cases via configuration, and are multi-purpose.

But the data aggregation and analysis problem becomes more difficult as the traffic scales up. It grows so large that a single database server is unable to perform a simple aggregation in a reasonable amount of time. Doing any sort of optimization for the querying (e.g., indexing) means presorting and writing extra data, which increases disk space used as well as the write throughput necessary to commit these writes to the disk.

At this point, a better solution is required.

Let’s focus on collecting this data first!

image

Sometimes easy solutions can become complicated. Sometimes it’s good to take a step back and look for simpler answers. "Simplicity is complexity resolved," after all :)

So one day we said (as no doubt others who have faced similar challenges have said): Wait, event data never changes. One user comes and does action X on object Y at time T on platform Z, and no part of this information ever changes. We can look at user action logs as system logs. Appending writes to files is super cheap and most of the building blocks to manipulate files as needed are out there in the OS environment. These problems were solved many years ago, just for different purposes.

There are many specialized software packages out there (old and new) for log collection and many of these benefit from these properties above. Think Flume, syslog implementations, Fluentd and Scribe (last updated two years ago). All of these can address different aspects of the data collection problem at different levels of granularity and may be chosen and used per your requirements.

We ended up using Rsyslog, a syslog implementation, to collect logs from our application/API servers. The protocol is very easy and syslog clients can be configured to buffer data on the client side for intermittent network outages. We wrote a really simple (literally a few lines of code) client wrapper that serializes a user event with field-delimited format. A simplified version looks like this:

<user_id>|<platform>|<action>|<object_id>|<timestamp>

Our syslog server aggregates these event logs into huge field-delimited files, and logadm rotates these files into our NFS for storing them temporarily.

We just solved the data collection problem, hopefully for quite some time, by deploying Rsyslog along with a few lines of code. Note that syslog implementations have been around for years and are very stable, so the maintenance tasks involved are limited. Any point in the system (e.g., buffers) can be monitored, and as your data reliability and consistency requirements change over time, other solutions can replace the log collection.

OK. That was the easy part. Now how exactly do we analyze all this big data we hear so much about?

Science: It works, friends.

As you know, there is this area of computer science known as distributed computing, which Google has applied to a programming model for processing large data sets with a parallel distributed algorithm on a cluster of similar nodes, and called it Map/Reduce in 2004.

As this is a model, there are different frameworks implementing these concepts in different ways. One very popular open-source toolset is Hadoop. Another one we’ve been watching closely is Spark.

While the details of how Map/Reduce works are beyond the scope of this post, it might be good to mention that this programming model requires a different way of thinking, and can be cumbersome at first for developers who are not familiar with it. In addition, depending on the framework, Map/Reduce solutions for problems might require extra implementation around how the data will be manipulated and will be flowing between different phases of the model.

One important practical aspect of deploying a typical Map/Reduce system is deciding whether it should be “on demand" or permanently deployed. Each option has a trade-off: running a semi-permanent cluster of decent size provides immediacy of the querying, but is expensive. Creating an “on-demand" cluster is much cheaper, but requires extra time to move the data from storage to the cluster before any queries can begin.

If extreme efficiency and cost are important, the “on-demand" approach is the way to go. This probably applies to many startups, and certainly applied to us.

Manta to the rescue!

image

While we were thinking about what would be the most cost-effective way of storing, aggregating and running our analytics queries on these huge log files, we heard about Joyent’s Manta, and had the chance to be involved in the Beta program.

Manta is Joyent’s new cloud-based object storage system, which enables the storing and processing of data simultaneously, without the need to move data between storage and compute. It offers strongly consistent data semantics, closer to UNIX file system properties, and, most interestingly, a native compute facility that allows processing of the objects in a fully featured SmartMachine environment, in parallel across many nodes.

The fundamental technology behind a SmartMachine is the concept of zones, which was inspired by FreeBSD jails. In Manta, every phase of a Map/Reduce job gets its own zone and the only communication between one phase and the next is through its output and input. Different zones are never aware of other zones on the system.

Most Map/Reduce frameworks aim to schedule the map phase near the data, so as not to move data around as much (data locality). In Manta terms, this means the zone will get instantiated “on top" of the data.

Ah, good old UNIX commands

Having a compute environment that makes it possible to use good old UNIX commands like grep, awk, cut, sed, uniq and sort on the data that will be streamed in between the phases seamlessly makes writing Map/Reduce jobs a really simple task, even for people with no prior experience. It’s also worth remembering that these commands have been optimized over the last few decades.

In Manta, the objects stored in the system are identified by keys (or paths). When a Map/Reduce job is being scheduled, it is provided with some keys referring to the objects whose content will be streamed as inputs to the first phase of the job defined. When the last phase of the job completes, a number of output objects are created in the system.

The compute zones for each phase start in a few seconds and start streaming and processing data. There is no configuration necessary (unless something specific is required) and the compute zones are totally managed by the framework.

Our integration

As we have been rotating our field-delimited user action logs daily, a script uploads the latest file to Joyent object storage from our NFS with the same schedule. We also did a one-time upload of all the previous user action logs into the system.

Considering this setup, all of the user action data is in the storage, partitioned by date, waiting to be processed for any sort of analysis. The data will get updated daily, but obviously this frequency can be changed per the time requirements for the analysis.

Querying user actions

Before jumping into a real life scenario, it might be good to explain what is happening with a few simple examples.

Most developers have had some experience with log parsing using a handful of UNIX commands while debugging things. All of these apply for our user action logs as well. Considering the simplified user action log pattern,

<user_id>|<platform>|<action>|<object_id>|<timestamp>

Let’s see what we can do, if these log files were in our local environment:

  • How many unique users performed the ‘follow_action’ on the ‘mobile_web’ platform on 06/26/2013?
cat user_actions_20130626.log | \
  awk -F'|' '{ if( $2 == “mobile_web” && $3 == “follow_action” ) { print $1 } }' | \
  sort | \
  uniq | \
  wc -l
  • What is the total number of actions on the desktop platform on the same day?
cat user_actions_20130626.log | \
  grep -F -e '|desktop|' | \
  wc -l

What we are doing here is implementing basic relational algebra primitives with a couple commands. Whatever your power tool of choice, everything is possible :)

Manta provides a Map/Reduce framework that streams the contents of objects in the storage into the initial phase of a Map/Reduce job and then connects outputs of phases to inputs of next phases until the last phase. This allows these types of commands to execute in parallel across many objects with a single API call. The data collection and filtering concepts we are using are not new, but we think Manta provides a very simple and familiar interface to taking this type of data analysis to the next level in terms of simplicity and efficiency.

In Manta, there are two phase types: mapping and reducing, referring to the first two steps of the five-step Map/Reduce model.

A map phase will basically run for each of the objects passed, providing multiple outputs. A reduce phase would run either for the output of a preceding map phase or for all of the input objects providing a single output.

Given this setup, we would be able to come up with a simple job that has one map phase and one reduce phase:

# Map phase:
grep -F -e '|follow_action|'

# Reduce phase:
wc -l

If we provide last week’s user action logs (last 7 objects) to the Map/Reduce job above, we would come up with the total number of follow actions that happened on all platforms within the last week. Since the map phases will run in parallel, the performance of this query would be similar to running it on a single day of user action logs. In theory, the reduce phase grows linearly with the number of objects involved, but the time spent in the map phase will not increase as we add new objects.

Most analytical queries will require a single final reduce step, which may become a bottleneck, but most of the processing/filtering can be moved to the mapping phase (if possible), resulting in high parallelization.

In order to make things more clear, let’s take a look at a concrete example: how we calculate our retention metrics.

Example: cohort retention analysis

Cohort retention can be defined as the question of the following: Given a specific group of users registered, how many of these users came back and did something after some amount of time?

We will do this in two Map/Reduce jobs. The first one will count the size of the cohort and, as a side effect, will store the user ids of this cohort into an object. The second job will count how many of these users came back and did something within the given time window.

Cohort calculation:

# Map phase:
awk -F'|' '{ if( $3 == “register_action” ) { print $1 } }'

# Reduce phase:
sort | \
  uniq | \
  mtee /wanelo/stor/tmp/cohort_user_ids | \
  wc -l

# Objects passed: a set of action logs in the date range
# of the cohort we are considering

Retention calculation:

# Map phase:
awk -F'|' '{ print $1 }'

# Reduce phase:
sort | \
  uniq > period_uniq_ids && \
  comm -12 period_uniq_ids /assets/wanelo/stor/tmp/cohort_user_ids | \
  wc -l

# Objects passed: a set of action logs in the date range
# of the time window we would like to see the retention for

For us, generating cohort retention reports that span a large portion of our historical data requires going through billions (order of magnitude) of rows. Using the above described approach allows us to extract the results in under a couple of minutes.

Long story short, we got

No sampling, perfect significance, fully parallelized, 4 lines of UNIX commands that are passed as a configuration to our underlying Map/Reduce wrapper using a Manta client.
Neat.

We also came up with our own domain-specific helper methods that generate these types of queries and poll Manta for results. Our internal metrics use this API heavily and report aggregated results into our time series DB. Then our dashboards use this time series DB for graphing aggregated metrics.

Conclusions!

In our case for storing and analyzing user activity data, taking a file-based approach to data collection was more appropriate than a database-centric approach, knowing that we would be able to express many of our analytics queries using UNIX commands on the user action logs. Taking advantage of proven technologies such as syslog provided a very cost-effective data collection solution.

Running these queries on Joyent’s Manta naturally extended this method to work across many objects in parallel, dramatically shortening the time required for them.

Manta was instrumental, as it offloaded the management of the data flow for Map/Reduce, simplifying our job for streaming data in and out of the computation.

Manta also provided a compute environment that is as friendly as a local environment, greatly simplifying operations and making everything intuitive. This removed some of the barriers to quickly start working with parallelized computations, but also did not limit us from uploading custom Map/Reduce scripts or binaries to do more complex or efficient computations.

At the end of the day, we ended up with a powerful internal analytics framework that’s easy to learn, maintain and extend.

As Manta was just announced a few days ago, we wanted to share some of our experience and insights on how we were able to utilize it in production. We hope this was useful for anyone currently solving similar problems or evaluating their options.

Please feel free to reach out to us with questions or comments!

-Atasay