Skip to content

Latest commit

 

History

History
221 lines (120 loc) · 29.8 KB

Chapter2.md

File metadata and controls

221 lines (120 loc) · 29.8 KB

#Chapter 2. Data Integration

The first application I want to dive into is data integration. Let me start by explaining what I mean by data integration and why I think it’s important, then we’ll see how it relates to logs.

Data integration means making available all the data that an organization has to all the services and systems that need it.

The phrase “data integration” isn’t all that common, but I don’t know a better one. The more recognizable term ETL (extract, transform, and load) usually covers only a limited part of data integration—populating a relational data warehouse. However, much of what I am describing can be thought of as ETL that is generalized to also encompass real-time systems and processing flows.

You don’t hear much about data integration in all the breathless interest and hype around the idea of big data; nevertheless, I believe that this mundane problem of making the data available is one of the more valuable goals that an organization can focus on.

Effective use of data follows a kind of Maslow’s hierarchy of needs. The base of the pyramid shown in Figure 2-1 involves capturing all the relevant data and being able to put it together in an applicable processing environment (whether a fancy real-time query system or just text files and Python scripts). This data needs to be modeled in a uniform way to make it easy to read and process. Once the basic needs of capturing data in a uniform way are taken care of, it is reasonable to work on infrastructure to process this data in various ways: MapReduce, real-time query systems, and so on.

chapter02 f1

Figure 2-1. A maslow-like hierarchy for using data. The collection and modeling of data remains a primary concern until these are mastered, then the focus can move to higher level goals.

It’s worth noting the obvious: without a reliable and complete data flow, a Hadoop cluster is little more than a very expensive and difficult-to-assemble space heater. Once data and processing are available, you can move on to more refined problems such as good data models and consistent, well understood semantics. Finally, concentration can shift to more sophisticated processing: better visualization, reporting, and algorithmic processing and prediction.

In my experience, most organizations have huge holes in the base of this pyramid—they lack reliable, complete data flow but want to jump directly to deep learning and advanced data modeling techniques. This is completely backwards.

The question is, how can we build reliable data flow throughout all the data systems in an organization?

##Data Integration: Two complications

Two trends have made data integration an increasingly difficult problem.

###Data is More Diverse

I think if you asked a company 15 years ago what data they had, they would describe their transactional data, such as users, products, orders, and other items kept in tables in relational databases.

However, our definition has broadened. Now most companies would also include event data. Event data records things that happen rather than things that are. In web systems, this means user activity logging, as well as the machine-level events and statistics required to reliably operate and monitor a data center’s worth of machines. People tend to call this “log data” since it is often written to application logs, but that confuses form with function. This data is at the heart of the modern Web: Google’s fortune, after all, is generated by a relevance pipeline built on clicks and impressions—that is, events.

This stuff isn’t limited to web companies, it’s just that web companies are already fully digital, so they are easier to instrument and measure. Financial data has long been event-centric. RFID adds this kind of tracking to physical objects. I think this trend will continue with the digitization of traditional businesses and activities. The “Internet of Things,” though a bit of a buzzword, seeks to describe the trend of connecting physical devices to the digital world. One of the primary motivations for this is to record event data about what these physical devices are doing so we can extend our digital modeling to the physical world and broaden the scope of things that can be programmed and optimized with software.

This type of event data shakes up traditional data integration approaches because it tends to be several orders of magnitude larger than transactional data.

The second trend is the explosion of specialized data systems that have become popular and often freely available in the last five years. Specialized systems exist for OLAP,search, simple online storage, batch processing, graph analysis, and so on.

The combination of more data of more varieties and a desire to get this data into more systems leads to a huge data integration problem.

###Log-Structured Data Flow

How can we attack this problem? Well, it turns out that the log is the natural data structure for handling data flow between systems. The recipe is very simple:

Take all of the organization’s data and put it into a central log for real-timesubscription.

Each logical data source can be modeled as its own log. A data source could be an application that logs events (such as clicks or page views), or a database table that logs modifications. Each subscribing system reads from this log as quickly as it can, applies each new record to its own store, and advances its position in the log. Subscribers could be any kind of data system: a cache, Hadoop, another database in another site, a search system, and so on (see Figure 2-2).

chapter02 f2

Figure 2-2. A log can be used for publish-subscribe messing. The publisher appends to the log and each subscriber keeps a pointer to its position in the log, allowing it to read independently.

The log concept gives a logical clock for each change against which all subscribers can be measured. This makes reasoning about the state of the different subscriber systems with respect to one another far simpler, as each has a point in time up to which it has read.

To make this more concrete, consider a simple case where there is a database and a collection of caching servers. The log provides a way to synchronize the updates to all of these systems and reason about their current state based on the point of time they are up to in the log. Let’s say we write a record with log entry X and then need to do a read from the cache. If we want to guarantee that we don’t see stale data, we just need to ensure that we don’t read from any cache that has not replicated up to X.

The log also acts as a buffer that makes data production asynchronous from data consumption. This is important for many reasons, but particularly when there are multiple subscribers that consume at different rates. This means a subscribing system can crash or go down for maintenance and catch up when it comes back up: the subscriber consumes at a pace it controls. A batch system such as Hadoop or a data warehouse might consume only hourly or daily, whereas a real-time query system might need to be up-to-the-second. Neither the originating data source nor the log has knowledge of the various data destination systems, so consumer systems can be added and removed with no change in the pipeline.

chapter02 f3

Figure 2-3. "Each working data pipeline is designed like a log; each broken data pipeline is broken in its own way." --Count Leo Tolstoy (tanslation by the author)

Of particular importance: the destination system only knows about the log and does not know any details of the system of origin. The consumer system need not concern itself with whether the data came from a relational database, a new-fangled key-value store, or was generated directly by some application. This seems like a minor point, but is, in fact, critical.

I use the term “log” here instead of “messaging system” or “pub sub” because it is much more specific about semantics and a much closer description of what you need in a practical implementation to support data replication. I have found that “publish subscribe” doesn’t imply much more than indirect addressing of messages—if you compare any two messaging systems that both promise publish-subscribe, you find that they guarantee very different things, and most models are not useful in this domain. You can think of the log as acting as a kind of messaging system with durability guarantees and strong ordering semantics. In distributed systems, this model of communication sometimes goes by the (somewhat terrible) name of atomic broadcast.

It’s worth emphasizing that the log is still just the infrastructure. This isn’t the end of the story of mastering data flow: the rest of the story is around metadata, schemas, compatibility, and the details of handling data structure and evolution. Until there is a reliable, general way of handling the mechanics of data flow, the semantic details are secondary.

###My Experience at LinkedIn

I got to watch this data integration problem emerge in fast-forward during my time at LinkedIn as we moved from a centralized relational database to a collection of distributed systems; I’ll give an abbreviated history of my experiences with these ideas.

These days, the major data systems LinkedIn runs include:

  • Search
  • Social graph
  • Voldemort (key-value store)
  • Espresso (document store)
  • Recommendation engine and ad serving systems
  • OLAP query engine
  • Hadoop
  • Terradata
  • Ingraphs (monitoring graphs and metrics services)
  • Newsfeed (the system that serves updates on the home page)

Each of these is a specialized distributed system that provides advanced functionality in its area of specialty.

This idea of using logs for data flow has been floating around LinkedIn since even before I got there. One of the earliest pieces of infrastructure we developed was a service called databus. Databus provided a log caching abstraction on top of our early Oracle tables to scale subscription to database changes so we could feed our social graph and search indexes.

My own involvement in this started around 2008 after we had shipped our key-value store. My next project was to try to get a working Hadoop setup going, and move some of our recommendation processes there. Having little experience in this area, we naturally budgeted a few weeks for getting data in and out, and the rest of our time for implementing fancy prediction algorithms. So began a long slog.

chapter02 f4

Figure 2-4. ETL in ancient Greece. Not much has changed.

We originally planned to just scrape the data out of our existing Oracle data warehouse. The first discovery was that getting data out of Oracle quickly is something of a dark art. Worse, the data warehouse processing was not appropriate for the production batch processing we planned for Hadoop—much of the processing was non-reversable and specific to the reporting being done. We ended up avoiding the data warehouse and going directly to source databases and log files. Finally, we implemented another pipeline to load data into our key-value store for serving results.

Each of these pipelines ended up being a significant engineering project. They had to be scaled to run across multiple machines. They had to be monitored, tested, and maintained. This mundane data copying ended up being one of the dominant items in the development we were doing. Worse, anytime there was a problem in one of the pipelines, as there often was in those days, the Hadoop system was largely useless—running fancy algorithms on bad data just produces more bad data.

Although we had built things in a fairly generic way, each new data source required custom configuration to set up. It also proved to be the source of a huge number of errors and failures. The site features we had implemented on Hadoop became popular and we found ourselves with a long list of interested engineers. Each user had a list of systems they wanted integration with and a long list of new data feeds they wanted.

First, the pipelines we had built, even though a bit of a mess, were actually extremely valuable. Just the process of making data available in a new processing system (Hadoop) unlocked many possibilities. It was possible to do new computation on the data that would have been hard to do before. Many new products and analysis came from simply putting together multiple pieces of data that had previously been locked up in specialized systems.

Second, it was clear that reliable data loads would require deep support from the data pipeline. If we captured all the structure we needed, we could make Hadoop data loads fully automatic, so that no manual effort was expended when adding new data sources or handling schema changes. Data would just magically appear in HDFS, and Hive tables would automatically be generated for new data sources with the appropriate columns.

Third, we still had very low data coverage. That is, if you looked at the overall percentage of the data LinkedIn had that was available in Hadoop, it was still very incomplete. Getting to completion was not going to be easy given the amount of effort required to operationalize each new data source.

The way we had been proceeding—building out custom data loads for each data source and destination—was clearly infeasible. We had dozens of data systems and data repositories. Connecting all of these would have led to building custom piping between each pair of systems, looking something like Figure 2-5.

chapter02 f5

Figure 2-5. A fully connnected architecture that has a separate pipleline between each system

Note that data often flows in both directions, as many systems (databases and Hadoop) are both sources and destinations for data transfer. This meant that we would end up building two pipelines per system: one to get data in and one to get data out.

This clearly would take an army of people to build and would never be operable. As we approached full connectivity, we would end up with something like O(N2) pipelines.

Instead, we needed something generic, as shown in Figure 2-6.

chapter02 f6

Figure 2-6. An architecture built around a central hub

As much as possible, we needed to isolate each consumer from the source of the data. The consumer should ideally integrate with just a single data repository that would give her access to everything.

The idea is that adding a new data system—be it a data source or a data destination—should create integration work only to connect it to a single pipeline instead of to each consumer of data.

This experience led me to focus on building Kafka to combine what we had seen in messaging systems with the log concept popular in databases and distributed system internals. We wanted something to act as a central pipeline first for all activity data, and eventually for many other uses, including data deployment out of Hadoop, monitoring data, and so on.

For a long time, Kafka was a little unique (some would say odd) as an infrastructure product—neither a database nor a log file collection system nor a traditional messaging system. But recently, Amazon has offered a service that is very similar to Kafka called Kinesis. The similarity goes right down to the way partitioning is handled and data is retained, as well as the fairly odd split in the Kafka API between high- and low-level consumers. I was pretty happy about this. A sign you’ve created a good infrastructure abstraction is that Amazon offers it as a service! Their vision for this seems to be similar to what I am describing: it is the piping that connects all their distributed systems—DynamoDB, RedShift, S3—as well as the basis for distributed stream processing using EC2. Google has followed with a data stream and processing framework, and Microsoft has started to move in the same direction with their Azure Service Bus offering.

###Relationship to ETL and Data Warehouse

Let’s talk data warehousing for a bit. The data warehouse is meant to be a repository for the clean, integrated data structured to support analysis. This is a great idea. For those not in the know, the data warehousing methodology involves periodically extracting data from source databases, munging it into some kind of understandable form, and loading it into a central data warehouse. Having this central location that contains a clean copy of all your data is a hugely valuable asset for data-intensive analysis and processing. At a high level, this methodology doesn’t change too much whether you use a traditional data warehouse like Oracle, Teradata, or Hadoop, although you might switch up the order of loading and munging.

A data warehouse containing clean, integrated data is a phenomenal asset, but the mechanics of getting this are a bit out of date.

The key problem for a data-centric organization is coupling the clean integrated data to the data warehouse. A data warehouse is a piece of batch query infrastructure that is well suited to many kinds of reporting and ad hoc analysis, particularly when the queries involve simple counting, aggregation, and filtering. But having a batch system be the only repository of clean, complete data means the data is unavailable for systems requiring a real-time feed: real-time processing, search indexing, monitoring systems, and so on.

ETL is really two things. First, it is an extraction and data cleanup process, essentially liberating data locked up in a variety of systems in the organization and removing any system-specific nonsense. Secondly, that data is restructured for data warehousing queries (that is, made to fit the type system of a relational database, forced into a star or snowflake schema, perhaps broken up into a high performancecolumn format, and so on). Conflating these two roles is a problem. The clean, integrated repository of data should also be available in real time for low-latency processing, and for indexing in other real-time storage systems.

###ETL and Organizational Scalability

I think this has the added benefit of making data warehousing ETL much moreorganizationally scalable. The classic problem of the data warehouse team is that they are responsible for collecting and cleaning all the data generated by every other team in the organization. The incentives are not aligned; data producers are often not very aware of the use of the data in the data warehouse and end up creating data that is hard to extract or requires heavy, hard-to-scale transformation to get it into usable form. Of course, the central team never quite manages to scale to match the pace of the rest of the organization, so data coverage is always spotty, data flow is fragile, and changes are slow.

A better approach is to have a central pipeline, the log, with a well-defined API for adding data. The responsibility of integrating with this pipeline and providing a clean, well-structured data feed lies with the producer of this data feed. This means that as part of their system design and implementation, they must consider the problem of getting data out and into a well-structured form for delivery to the central pipeline. The addition of new storage systems is of no consequence to the data warehouse team, as they have a central point of integration. The data warehouse team handles only the simpler problem of loading structured feeds of data from the central log and carrying out transformation specific to their system (see Figure 2-7).

chapter02 f7

Figure 2-7. There is a human element to data flow as well. This is one of key aspects of democritizing access to data.

This point about organizational scalability becomes particularly important when an organization considers adopting additional data systems beyond a traditional data warehouse. Say, for example, that you wish to provide search capabilities over the complete data set of the organization. Or, say that you want to provide sub-second monitoring of data streams with real-time trend graphs and alerting. In either case, the infrastructure of the traditional data warehouse or even a Hadoop cluster will be inappropriate. Worse, the ETL processing pipeline built to support database loads is likely of no use for feeding these other systems, making bootstrapping these pieces of infrastructure as large an undertaking as adopting a data warehouse. This likely isn’t feasible and probably helps explain why most organizations don’t have these capabilities easily available for all their data. By contrast, if the organization had built out feeds of uniform, well-structured data, getting any new system full access to all data requires only a single bit of integration plumbing to attach to the pipeline.

###Where Should We Put the Data Transformations?

This architecture also raises a set of different options for where a particular cleanup or transformation can reside:

  • It can be done by the data producer prior to adding the data to company-wide log.
  • It can be done as a real-time transformation on the log (which in turn produces a new, transformed log).
  • It can be done as part of the load process into some destination data system.

The best model is to have the data publisher do cleanup prior to publishing the data to the log. This means ensuring that the data is in a canonical form and doesn’t retain any holdovers from the particular code that produced it or the storage system in which it might have been maintained. These details are best handled by the team that creates the data since that team knows the most about its own data. Any logic applied in this stage should be lossless and reversible.

Any kind of value-added transformation that can be done in real-time should be done as post-processing on the raw log feed that was produced. This would include things like sessionization of event data, or the addition of other derived fields that are of general interest. The original log is still available, but this real-time processing produces a derived log containing augmented data.

Finally, only aggregation that is specific to the destination system should be performed as part of the loading process. This might include transforming data into a particular star or snowflake schema for analysis and reporting in a data warehouse. Because this stage, which most naturally maps to the traditional ETL process, is now done on a far cleaner and more uniform set of streams, it should be much simplified.

###Decoupling Systems

Let’s talk a little bit about a side benefit of this architecture: it enables decoupled, event-driven systems.

The typical approach to activity data in the web industry is to log it out to text files where it can be scrapped into a data warehouse or into Hadoop for aggregation and querying. The problem with this is the same as the problem with all batch ETL: it couples the data flow to the data warehouse’s capabilities and processing schedule.

At LinkedIn, we have built our event data handling in a log-centric fashion. We are using Kafka as the central, multisubscriber event log (see Figure 2-8). We have defined several hundred event types, each capturing the unique attributes about a particular type of action. This covers everything from page views, ad impressions, and searches to service invocations and application exceptions.

chapter02 f8

Figure 2-8. What happens when you view a job on LinkedIn? The system is responsible for displaying the job records, and each system that needs to be aware of this subscribes to the stream of all job views and reacts appropriatedly in its own timeframe.

To understand the advantages of this, imagine a simple event showing a job posting on the job page. The job page should contain only the logic required to display the job. However, in a fairly dynamic site, this could easily become tangled up with additional logic unrelated to showing the job. For example, let’s say we need to integrate the following systems:

  • We need to send this data to Hadoop and the data warehouse for offline processing purposes
  • A security system needs to count the view to ensure that the viewer is not attempting some kind of content scraping.
  • We need to aggregate this view for display in the job poster's analytics page.
  • The job recommendation system need to record the view to ensure that we properly impression cap any job recommendations for that user (we don't want to show the same thing over and over).
  • Monitoring systems need to track the display rate and application reate for jobs to ensure that the system is functioning well.

Pretty soon, the simple act of displaying a job has become quite complex. As we add other places where jobs are displayed—mobile applications and so on—this logic must be carried over and the complexity increases. Worse, the systems that we need to interface with are now somewhat intertwined—the person working on displaying jobs needs to know about many other systems and features and make sure they are integrated properly. This is just a toy version of the problem, and any real application would be more complex, not less.

The event-driven style provides an approach to simplifying this. The job display page now just shows a job and records the fact that a job was shown along with the relevant attributes of the job, the viewer, and any other useful facts about the display of the job. Each of the other interested systems—the recommendation system, the security system, the job poster analytics system, and the data warehouse—all just subscribe to the feed and do their processing. The display code does not need to be aware of these other systems or changed if a new data consumer is added.

###Scaling a Log

Of course, separating publishers from subscribers is nothing new. However, if you want to keep a commit log that acts as a multisubscriber real-time journal of everything happening on a consumer-scale website, scalability will be a primary challenge. Using a log as a universal integration mechanism is never going to be more than an elegant fantasy if we can’t build a log that is fast, cheap, and scalable enough to be practical in this domain.

Distributed systems people often think of a distributed log as a slow, heavyweight abstraction (and usually associate it only with the kind of metadata uses for which Zookeeper might be appropriate). With a thoughtful implementation focused on journaling large data streams, this need not be true. As an example, LinkedIn writes hundreds of billions of messages to production Kafka clusters each day.

We used a few tricks in Kafka to support this kind of scale:

  • Partitioning the log
  • Optimizing throughput by batching reads and writes
  • Avoiding needless data copies

In order to allow horizontal scaling, we chop up our log into partitions, as shown inFigure 2-9.

chapter02 f9

Figure 2-9. By partitioning the log, we allow each partition to act independently of all other partitions. This lets us horizontally scale the write throughput.

Each partition is a totally ordered log, but there is no global ordering between partitions (other than perhaps some wall-clock time you might include in your messages). The writer controls the assignment of the messages to a particular partition, with most users choosing to partition by some kind of key (such as a user ID). Partitioning allows log appends to occur without coordination between shards, and allows the throughput of the system to scale linearly with the Kafka cluster size while still maintaining ordering within the sharding key.

Each partition is replicated across a configurable number of replicas, each of which has an identical copy of the partition’s log. At any time, a single partition will act as the leader; if the leader fails, one of the replicas will take over as leader.

Lack of global order across partitions is a limitation, but we have not found it to be a major one. Indeed, interaction with the log typically comes from hundreds or thousands of distinct processes, so it is not meaningful to talk about a total order over their behavior. Instead, the guarantees that we provide are that each partition is order preserving, and Kafka guarantees that appends to a particular partition from a single sender will be delivered in the order they are sent.

A log, like a filesystem, is easy to optimize for linear read and write patterns. The log can group small reads and writes together into larger, high-throughput operations. Kafka pursues this optimization aggressively. Batching occurs from client to server when sending data, in writes to disk, in replication between servers, in data transfer to consumers, and in acknowledging committed data.

Finally, Kafka uses a simple binary format that is maintained between in-memory log, on-disk log, and in-network data transfers. This allows us to make use of numerous optimizations, including zero-copy data transfer.

The cumulative effect of these optimizations, is that you can usually write and read data at the rate supported by the disk or network, even while maintaining data sets that vastly exceed memory. For example, a single thread can write 100-byte messages at a rate of about 750k messages per second, each being stored with 3x replication. Reading is even faster at about 900k messages per second. More benchmarks are available.

This write-up isn’t meant to be primarily about Kafka so I won’t go into further details. You can read a more detailed overview of LinkedIn’s approach and a thorough overview of Kafka’s design.