The ingestion pipeline
I've built a data processing system for working with streams of pages; it seems like it's pretty solid now so let's take a look at how it works.
(Note: this post is backdated; it was written Jan 17, 2022.)
The purpose of this system is to get pages from a datasource, process them in various ways, and pass each page through to an "ingestor" which will consume them, for example to put them into ElasticSearch. Before I built this system, I was shuffling this data around with shell pipelines, which has several disadvantages:
- It requires serializing and deserializing the data to pass it through the pipeline, reducing performance.
- It requires that the two ends know exactly what properties the source provides, and how to get the rest—but this is not always consistent. For example, one datasource might know each page's title already, while another might require HTML parsing to find it. Meanwhile, the ingestor might not need the title at all, so parsing the HTML to get it would just be wasted work.
The ingestion system introduces "processors" in the middle to take care of matching what the source provides to what the ingestor needs. Overall, it looks like this:
A source has a simple signature, consisting of an async function*()
which will yield an object for each page, and a list of keys that each object will contain:
module.exports = {
source: async function*(...args) {
yield* pages
},
returns: ['url', 'title', 'body_html'],
}
An ingestor has a similar signature, a function which iterates over each object and a list of keys the object must contain:
module.exports = {
ingest: async function (source) {
for await (const page of source) {
// Do things
}
},
requires: ['url', 'links'],
}
The complicated bit is figuring out how to wire them together, to get from returns
to requires
. This is done by “processors” (currently one for each key), which define a function for adding the key to the object, and also have dependencies of their own:
// process/body_qdom.js
const parse_html = require('node-html-parser')
module.exports = {
requires: ['body_html'],
run: page => {
page.body_qdom = parse_html.parse(page.body_html, {pre: true})
},
}
Together, these form a directed graph which currently looks something like this:
It's not much of a DAG, but there it is. I expect there will be more entries in the future.
Anyway, however trivial it is, I still want to have this figured out for me. Here's the code that wires everything together:
const processors = []
const have = input.returns
function resolve_reqs(proc) {
for (const req of proc.requires) {
if (!have.includes(req)) {
const proc = require('./process/'+req)
resolve_reqs(proc)
have.push(req)
processors.push(proc)
}
}
}
resolve_reqs(output)
const iterutil = require('./util/iterutil')
async function run() {
await iterutil.chain(
input.source(...input_args),
_ => iterutil.tap(_, v => processors.forEach(p => p.run(v))),
_ => output.ingest(_, ...output_args),
)
}
run().then(null, e => {throw e})
This code works backwards from the requirements, adding new processors until all dependencies have been fulfilled. (It's a very simple topological sort.) Once it's figured out what order it needs to run them all in, it threads them together with a utility called iterutil.chain
that takes an iterator and passes each object through a series of steps. (This collection of utilities has proven very useful, I should consider publishing it.)
Combining this with the diagrams above, the result might look something like this:
It feels a bit like a self-assembling LEGO set. I just write out whichever task I want done, and so long as all the pieces are there they snap themselves together automatically.