Feep! » Blog » Post

A unified data processing framework

I've been managing the process of importing StackExchange posts with an increasingly hacky and annoying shell script. Today I built out what I hope will be a useful replacement, though I'm still not entirely happy with it.

(Note: this post is backdated; it was written Jan 17, 2022.)

The procedure for getting a StackExchange site into the index looks something like this:

graph TD subgraph download Z{{wget}} --> A end subgraph process A[.7z] --> B{{"7z e Posts.xml | extract_xml.py"}} B --> C[.questions.jsonl] B --> D[.answers.jsonlx] B ----> E[.aqmap.dat] D --> F{{"sort | cut"}} F --> G[.answers.jsonl] end subgraph - G --> H[[source]] C --> H E -- "*.aqmap.dat" --> I[[canonicalize]] end subgraph getlinks H --> GL{{ingest/output_links.js}} --> LJ[.links.json.gz] end subgraph genlinkdat LJ & I --> P1{{generate_link_dat.js}} --> P2[.links.dat] end subgraph pageranks_calc P2 -- "*.links.dat" --> P3{{"pageranks_calc.js"}} --> P4[pageranks.dat] end subgraph elasticimport H & P4 --> E1{{ingest/output_elastic.js}} --> EX[(index)] end style - fill:none,stroke:none

Did you wear out your scrollbar going past that? Yeah, it's a lot of moving pieces. Each of those boxes represents a shell script, and they all have to be run in the right order, separately for each StackExchange site, but keeping track of breaks in the flow where we need data for all of them simultaneously. Plus this is only one datasource of the many I expect to have. Oh, and also, sometimes I only want to run part of the process for debugging. I've been coordinating all of this with some shell scripts, but it's still klunky and hard to keep track of.

In an attempt to simplify this, I've built what I call the "dispatcher" which has this process represented as a complete DAG (minus the Elastic part which I haven't built in yet), and can track dependencies. This is designed along the same lines as the ingestion processor system which has worked very well, though I don't feel like I've got the abstraction quite as nice this time.

The idea is that the dispatcher knows about every task. I've shown the diagram above with a single StackExchange site, but for example in the dispatcher's DAG the pageranks_calc task depends on 60 different genlinkdat tasks, one for each site. Each task has an execution method, a list of dependencies (possibly with wildcards), and some way to check if it's stale. (Of course a task is also considered stale if any of its dependencies are stale.)

The first step that happens when the dispatcher runs is that it calculates a list of all the tasks and their dependencies. (This happens at runtime so that I can have the code dynamically update, for example as more StackExchange sites are added.) Once it has the list, it iterates through it and checks for stale tasks. (It also makes a list of reasons why the task is considered stale, such as “missing output file X” or “older than dependency Y”, to help with debugging.) Finally it does a simple topological sort (make sure each of a task's dependencies is in the execution list before appending that task to the list) and begins executing—which currently consists mainly of call to child_process.spawn, though I have plans to make it more integrated.

I also have some flags for filtering and querying, so I can do things like ask it which tasks are currently stale and why:

$ node ./dispatcher.js status --stale
datasource:stackexchange,subset:stackoverflow.com,action:download stale (missing output file stackoverflow.com-Posts.7z)
datasource:stackexchange,subset:stackoverflow.com,action:extract-posts stale (stale dep datasource:stackexchange,subset:stackoverflow.com,action:download)
datasource:stackexchange,subset:stackoverflow.com,action:sort-posts stale (stale dep datasource:stackexchange,subset:stackoverflow.com,action:extract-posts)
datasource:stackexchange,subset:stackoverflow.com,action:getlinks stale (stale dep datasource:stackexchange,subset:stackoverflow.com,action:sort-posts)
datasource:stackexchange,subset:askubuntu.com,action:getlinks stale (older than dep datasource:stackexchange,subset:askubuntu.com,action:sort-posts)

or have it only run downloads but skip other changes for now:

$ node ./dispatcher.js exec datasource:stackexchange,action:download

Right now each task is executed in a separate process and all of the data that's passed between tasks is stored in files, so this is mostly an overcomplicated way of emulating make. However, I have plans to extend this system to support streaming results, which will let me save on disk space (since I won't need to store intermediate files) and also improve performance (by avoiding serialization/deserialization overhead)—and though it’ll need a bit more thinking to figure out how to set things up so these plans work well I have high hopes for this scheme.