Feep! » Blog » Post

The ingestion pipeline

I've built a data processing system for working with streams of pages; it seems like it's pretty solid now so let's take a look at how it works.

(Note: this post is backdated; it was written Jan 17, 2022.)

The purpose of this system is to get pages from a datasource, process them in various ways, and pass each page through to an "ingestor" which will consume them, for example to put them into ElasticSearch. Before I built this system, I was shuffling this data around with shell pipelines, which has several disadvantages:

The ingestion system introduces "processors" in the middle to take care of matching what the source provides to what the ingestor needs. Overall, it looks like this:

flowchart LR subgraph processors end A(source) --> processors --> B(ingestor) style processors fill:#ffffde,stroke:#aaaa33,stroke-width:1px

A source has a simple signature, consisting of an async function*() which will yield an object for each page, and a list of keys that each object will contain:

module.exports = {
	source: async function*(...args) {
		yield* pages
	returns: ['url', 'title', 'body_html'],

An ingestor has a similar signature, a function which iterates over each object and a list of keys the object must contain:

module.exports = {
	ingest: async function (source) {
		for await (const page of source) {
			// Do things
	requires: ['url', 'links'],

The complicated bit is figuring out how to wire them together, to get from returns to requires. This is done by “processors” (currently one for each key), which define a function for adding the key to the object, and also have dependencies of their own:

// process/body_qdom.js
const parse_html = require('node-html-parser')

module.exports = {
	requires: ['body_html'],
	run: page => {
		page.body_qdom = parse_html.parse(page.body_html, {pre: true})

Together, these form a directed graph which currently looks something like this:

flowchart LR U([url]) --> id{{id}} H([body_html]) --> Q{{body_qdom}} Q --> T{{body_text}} Q --> L{{links}}

It's not much of a DAG, but there it is. I expect there will be more entries in the future.

Anyway, however trivial it is, I still want to have this figured out for me. Here's the code that wires everything together:

const processors = []
const have = input.returns
function resolve_reqs(proc) {
	for (const req of proc.requires) {
		if (!have.includes(req)) {
			const proc = require('./process/'+req)

const iterutil = require('./util/iterutil')

async function run() {
	await iterutil.chain(
		_ => iterutil.tap(_, v => processors.forEach(p => p.run(v))),
		_ => output.ingest(_, ...output_args),

run().then(null, e => {throw e})

This code works backwards from the requirements, adding new processors until all dependencies have been fulfilled. (It's a very simple topological sort.) Once it's figured out what order it needs to run them all in, it threads them together with a utility called iterutil.chain that takes an iterator and passes each object through a series of steps. (This collection of utilities has proven very useful, I should consider publishing it.)

Combining this with the diagrams above, the result might look something like this:

flowchart LR subgraph processors direction LR U([url]) ---> id{{id}} H([body_html]) --> Q{{body_qdom}} Q --> T{{body_text}} end A(stackexchange) --> processors --> B(elastic) style processors fill:#ffffde,stroke:#aaaa33,stroke-width:1px

It feels a bit like a self-assembling LEGO set. I just write out whichever task I want done, and so long as all the pieces are there they snap themselves together automatically.