Optimizing Storage with Immutable Text Data

by Dominic Burkart
August 24th, 2020

I've been working on an open science project involving all edits to wikipedia. For my analysis, I needed some worker computers to have all of the edits from a specific time period. At first, I thought about setting up a postgres database that all of the workers could query. But, it soon became clear that postgres was not the right tool: it would take around 60 TB to store all of the edits, so renting a server that could handle the full database on AWS or similar would cost over $1000/month. I needed an alternative, and started looking for ways to make a 10x reduction in storage requirements.

Fundamentally, mutable database architectures require a lot of space because they are dealing with large amounts of mutable data, often with complex indexing needs. Being able to write and edit to different parts of the database without causing concurrency errors is super powerful, but comes with some drawbacks. So does being able to query based on any field. However, in our application, we don't need these superpowers: the data is immutable, and we only care about two ways of retrieving data (by ID, or by time). We don't have to worry about uniform column/row lengths or figuring out how to keep all of our indices up-to-date, which is a critical difference.

While we don't have to worry about mutability, we do care about concurrency. Multiple workers should be able to request the same data at once. Fortunately, the file systems in modern operating systems are great for this: a large number of processes can read from the same file at the same time, as long as nothing is trying to write to the same file. The ability to read and write concurrently is one of the main reasons that databases were created to handle data. Without this need, file systems look interesting again.

Lex Murphy, character from Jurassic Park, fighting killer dinosaurs by navigating a file system.

file systems are extremely cool.

However, another reason databases became popular is because file systems often have a hard time keeping track of a large number of small files. Since storing each wikipedia revision in its own file was off the table, I would have to store revisions in a small number of files and keep track of where in each file a revision is stored. If you haven't worked with I/O streams before, this might sound like you would have to open a file and read every byte, starting at position 0, all the way to the correct offset. Fortunately, this is not the case! There are a lot of fundamental parts of the OS and of important applications (including basically every database) that rely on being able to perform random seeks.

So, storing stuff within a file is actually a lot like storing stuff within the heap: all you need is an address (an offset), which you can use to access anything in O(1) time. (Note: this is a bit of a fib. Both memory and storage retrieval are not actually O(1) in practice because of os- and application-level file buffers, and because of how memory is implemented in DRAM and how storage is implemented in HDDs/SSDs. What we mean here is that we can't really know how complex a given retrieval procedure will be without knowing the system context at runtime, so we trust the operating system to optimize IO and consider IO to be O(1) from the perspective of the application.)

By relying on the power of the file system, the program started taking shape. When writing each revision, the program notes the revision ID, the current size of the file (in other words, the offset), and the length of the bytes to write. The ID, offset, and length are added to the index store. Since the written bytes are compressed, retrieving a revision would require the computer to:

  1. Find the offset and length based on the revision ID from the index store.
  2. Open the relevant data file (if it is not already open).
  3. Go to the correct offset.
  4. Read "length" number of bytes.
  5. Decompress the bytes into a revision.

The next part of the project was to create a way for users to request revisions from a date period. To do this, we note the time each revision was made, and create a new index store that maps dates to revisions. By using a binary tree mapping, we can efficiently query time periods to get revision IDs, and then use the IDs to collect the revisions themselves. To avoid large memory requirements, we generate several hundred smaller binary trees instead of one large one. We are able to near-perfectly distribute revisions across the trees, since we know the first revision ID, we know the last revision ID, and we know that the IDs increase monotonically.

Finally, it was time to make more concrete choices. I decided that I wanted to write this server in Rust, to benefit from the language's performance, easy parallelization, and comprehensive type-checking. I implemented the algorithm above, and, since I would no longer be using the postgres API, wrote the HTTP server interface using Actix.

Actix gave me a lot of nice features. Below is the code for the service for returning revisions from a specific time period, when passed a pair of numeric unix timestamps. It handles streaming an iterator, requiring very limited memory:

// ...

lazy_static! {
  pub static ref STATE: State = {
    let f = File::open(SUPER_DATE_BTREE_FILE).unwrap();
    let buf = BufReader::with_capacity(BUF_SIZE, f);
    let super_map: BTreeMap =
      serde_json::from_reader(buf).unwrap();
    State {
        starting_date_per_index: super_map,
    }
  };

  // ...
}

pub struct State {
  starting_date_per_index: BTreeMap<Instant, String>,
}

impl State {

  // ...

  fn revision_ids_from_period(
    &self,
    start: Instant,
    end: Instant,
  ) -> impl Iterator<Item=Vec<RevisionID>> {
    // the prior start and trailing end are the
    // edges of the window of the trees that
    // contain the revisions from from start to end.

    let included_window_start = self
      .starting_date_per_index
      .range((Unbounded, Included(start)))
      .map(|(instant, _path)| instant)
      .next_back()
      .unwrap_or(&start); // start is out of known range

    let included_window_end = self
      .starting_date_per_index
      .range(end..)
      .map(|(instant, _path)| instant)
      .next()
      .unwrap_or(&end); // end is out of known range

    self.starting_date_per_index
      .range((
        Included(included_window_start),
        Included(included_window_end),
      ))
      .map(move |(_date, path)| {
        let time_indices: BTreeMap<Instant, Vec<RevisionID>> = {
          let f = File::open(path).unwrap();
          let buf = BufReader::with_capacity(BUF_SIZE, f);
          let compressor = brotli2::read::BrotliDecoder::new(buf);
          serde_json::from_reader(compressor).unwrap()
        };
        time_indices
          .range(start..end)
          .map(|(_date, ids)| ids)
          .flatten()
          .copied()
          .collect()
      })
  }

// ...

  fn revisions_from_period(
    &self,
    start: Instant,
    end: Instant,
  ) -> impl Iterator<Item=Revision> {
    self.revision_ids_from_period(start, end)
      .flatten()
      .map(move |id| self.get_revision(id))
  }
}

#[get("/{start}/{end}/revisions")]
async fn get_revisions_for_period(
  info: web::Path<(UnixTimeStamp, UnixTimeStamp)>
) -> impl Responder {
  // convert from timestamps to fixed offsets
  let start =
    FixedOffset::east(0)
      .timestamp(info.0, 0); // arbitrary offset
  let end =
    FixedOffset::east(0)
      .timestamp(info.1, 0);

  // generate and return response stream
  let stream = iter_to_byte_stream(
    STATE.revisions_from_period(start, end)
  );
  HttpResponse::Ok().streaming(stream)
}

// ...

These streams are also Brotli or Gzip-compressed, depending on what the client's headers say they are willing to accept. This only took a single line while instantiating the server (marked with a comment):

#[actix_rt::main]
async fn server(bind: String) -> std::io::Result<()> {
  HttpServer::new(|| {
    ActixApp::new()
      .wrap(middleware::Compress::default()) // adds compression
      .service(get_revisions_for_period)
  })
  .keep_alive(45)
  .bind(&bind)?
  .run()
  .await
}

I was pretty happy with development in Rust. While there is substantially less documentation of Actix than, say, Flask, the type system was really helpful in avoiding basic errors. Development did take longer, but debugging was much shorter. The integration test was also easy to write and runs quickly, which was really nice. While Rust has a reputation for verbose syntax, I found that there was limited code bloat compared to a python implementation, since Actix and the Rust standard library are both very feature-rich.

It was fun to learn more about what you can do with a POSIX filesystem, and I succeeded in decreasing my storage needs by a factor of ten. Instead of using an AWS server with $1000/month in storage costs, the program can now run on a Raspberry Pi 4 with 4 GB of RAM, a 6 TB external hard drive, and a small SSD for the index stores. Throughput can scale considerably with faster processors, more cores, and larger memory for the OS-level file read buffers. In the long term, I'd like to return to this code and build an application that can ingest arbitrary immutable data and build a compressed data store like the one that we made here.

I developed this server in Docker, so hopefully installation should be pretty easy. The source code is available on github if you'd like to try it out for yourself!


☄︎