Apr 4, 2014

Twitch Data Analysis — Part 1 of 3: The Twitch Statistics Pipeline

Editor’s Note: This content is also mirrored on the author’s personal blog. So check that out, too, for this and other musings.

This is part one of a three-part series covering all aspects of our data analysis. This part describes our pipeline. The second part will go into detail concerning particular design decisions. The third part covers this history of analytics at Twitch.

In the beginning, we logged all of our data to Mixpanel. As our user base grew, we logged an increasing number of events; this growth in data points vastly outstripped our user growth to the point where we were sending Mixpanel billions of events per week. As our growth continued, we have needed to make better decisions based upon joining different events to gain really deep insight into our users’ behaviour. Count based metrics, such as those provided by Mixpanel or statsd, are insufficient when it comes to this and given the ever increasing cost of Mixpanel, we brought together a team to work on storing our event data in an economical fashion while providing the tools to query the data without these downsides.

Today we have replaced the near-real-time nature of Mixpanel with our own bulk ingest solution, which achieves our goal of <24 hour data latency. Currently our latency is ~5 hours and we’ll soon be working on a “fast-layer” on top of our current solution to provide us with the ability to have dashboards displaying real-time stats.

We formed the team at the tail end of 2013 and began work on the project in earnest at the start of 2014. The new pipeline, currently in its third revision, is hosted in AWS where we use Asgard to coordinate deploys and elastic scaling. The pipeline is composed of the following core components:

  1. The client sending us the stats

  2. An edge server

  3. A processor

  4. An ingest coordinator

  5. Storage

The Client

Naturally the pipeline starts with users on client applications. We have a wide variety of clients: Xbox One, iOS, our website, our Flash video player, various Android ports, etc. Since these clients all have mature and robust HTTP clients, we use HTTP to send events to our stats pipeline edge servers.

****These clients know when to emit events that are pertinent to our product as a whole. The most common stat that the clients log is “minutes-watched”; each of our video players emit this for each minute of video that is watched by a viewer, along with a bundle of JSON metadata describing the client’s state. The most critical stat is “buffer-empty”, which is sent when video stutters. When these happens, we know someone had a poor viewing experience — our video and networking teams base critical scaling decisions on these events as part of our never-ending pursuit of great global video QoS.

The Edge Server

Our clients send base64-encoded JSON objects to our edge server using HTTP GETs. The edge server consists of an nginx instance and a logging daemon written in Golang. The nginx instance merely passes the HTTP request to the logging daemon, which in turn logs the HTTP request to a file. Every 100mb of logged data, we upload the log to S3 and emit a SQS message with data about the file to a queue which the processor listens on.

An example data packet from a client.

These edge servers sit behind an ELB and are configured as an Auto Scale Group. We scale up and down based on the overall traffic level we see coming in to the cluster. Our primary concern is to ensure that we do not lose too much data in the event of a machine being taken down by something at AWS [1].

The logging daemon, which we’ll soon open source, can be configured to rotate log files on two criteria:

  1. creation time

  2. file size

This permits us to minimize our risk of data loss even during low tide, though with 100mb rotations we never rotate on creation time. When the file is rotated, it signals an in-process uploader to upload the file to an S3 bucket and publishes a corresponding SQS message.

The Processor

The processor is a daemon, written in Golang, which listens to the SQS queue published to from the edge layer.

For each inbound file, the processor unpacks the data and — assuming it is valid — extracts the data from it and writes it into a target file. One file for each type of event we have. This processor uses the same logging library that the edge server uses, however it rotates on either 5 hours or 1GB of data. The processor will continue to write to the same output file until it is rotated out of the way, which once again triggers an S3 upload and an accompanying SQS message.

The target file is a gzipped TSV [2] with a line per processed packet. Subsequent runs, in between rotations, will write to the same file; gzip supports this form of appending of data to a file (go ahead and try catting two gzip files into one and then zcatting them!). The ordering of the columns in the TSV is critical since it must match the ordering of the columns of the target table. The ordering is informed by a “schema” server we have. This will be discussed in the Storage section of this post.

An example conversion. Not a great one :)

The Ingest Coordinator

As with the processing layer, this layer receives SQS messages and triggers imports into our storage layer. We use Redshift to store our data, from within Redshift you have numerous ways to import data; we use the COPY command since it can read from S3, and supports reading in gzipped files. Our decision to store files in a TSV stems from wanting to be Redshift COPY compatible.

The ingester itself is an area that we’ve room for significant improvement, notably Redshift supports manifest files which can dramatically increase the performance of imports. Currently, it takes us around 3 hours a day to ingest 24 hours of data. As we scale up, we’ll need to improve this rate: the manifest file option permits each node to be responsible for a file versus our current strategy of one file at a time per COPY command.

An example COPY

Storage

We have a four dw1.xlarge node Redshift cluster in which we have a table per event type. Each table has a column per property. Redshift permits you to specify a distribution key, which is used during import to ensure data which you intend to join on is stored on the same node. This minimizes leader node work and network traffic thusly improving join performance massively. In our case, we distribute on a semi-permanent [3] client identifier and sort on timestamp.

Our schema server coordinates the processor layer and the storage layer. Since TSVs have no header row, the order of the entries is critical and must match the target table. The processor layer caches a version of the current known schema for a few minutes at a time and as it processes data it emits the TSV with data in the correct column order. Each time a new event is to be logged we use the schema server to issue the necessary CREATE TABLE commands in Redshift. If a new property is to be added to an event the schema server issues an ALTER TABLE to add the corresponding column. Our ingester uses the FILLRECORD modifier to the COPY command which prevents these ALTERs from breaking imports. This structure allows us to rapidly log new and richer data which supports making rapid decisions by our product teams. It is a short term aim of ours to make the the schema server really bulletproof so that we do not need to be part of this flow.

Conclusion

This design allows us to reach ~100,000 inserts a second. Previous versions of the pipeline capped out far below that. We’ve some ambitious plans for the future. Most notably building an ETL flow which permits us to create a richer data set by combining our current event data into something which tells a more holistic story of the people using our product. An easy to grok example of data that we’re interested in is which initial referrers turn into the most loyal users and how is that loyalty impacted by QoS; while it sounds simple there is the very interesting task of aggregating loyalty per referrer while also computing initial referrer: do we take the entire data set and run it through a Map Reduce job? How do we do that? What intermediate data do we store that prevents this from being an agonizingly long query in the not too distant future? What happens if we load data from the past that may not have been loaded due to an error and that changes the initial referrer for a range of users?

If you’re interested in knowing more or have a strong idea around where we can bound our problems, please feel free to contact us, details can be found on our team page. Additionally, feel free to comment on the HackerNews thread.

[0] Ideally we’d like CLOUD_ENVIRONMENT to be ‘integration’ but Asgard makes that difficult; they effectively map CLOUD_ENVIRONMENT to an AWS account — in this case meaning we’d need a different AWS account for our integration. This doesn’t jive with the AWS management console so well. Using CLOUD_DEV_PHASE felt like a reasonable compromise here.

Footnotes

[1] We’re currently only running in one AWS AZ, near term work will move us up to running edges on the east and west cost of the USA as well as one in the EU and possibly Sao Paulo.

[2] We’re looking into alternatives for this. Redshift understands this format natively, but it is not convenient when running map reduce jobs since gzip cannot be split easily.

[3] It is actually quite stunning how non-permanent permanent storage is. For example, some consoles do not permit applications the ability to store data, so instead you must compute data in a consistent way. Of course there is also the iOS UUID situation.

Appendix

Noteworthy mention: Asgard and Cloud Configuration

At the heart of our operation is Asgard, which — when fully configured — initializes user data on your EC2 instances as they come up. We use the excellent goamz library from CrowdMob to source this user data and make decisions at run time as to where data should flow. For example, our integration test servers have CLOUD_DEV_PHASE set, which we use to override CLOUD_ENVIRONMENT [0], which configures which SQS queues and S3 buckets we use at each stage of the pipeline.

Spade is our internal name for the pipeline. It flies against my religion of naming things what they do!

In other news
Apr 4, 2014

Twitch Store. Opening Monday. Also, 2000 Purple Hoodies. ‘Nuff. Said. Period.

Twitch Store. Opening Monday. Also, 2000 Purple Hoodies. ‘Nuff. Said. Period. Post
Apr 4, 2014

Twitch Weekly Episode 2 — Week of April 4th, 2014

Twitch Weekly Episode 2 — Week of April 4th, 2014 Post