Skip to content

Writing a Custom Stage

A stage is a single, replaceable unit of work in the pipeline. When the six built-ins — walk → parse → chunk → relate → enrich → embed-pack — don’t cover something you need (a redaction pass before enrichment, a custom dedup step, a domain-specific relation resolver), you write your own stage and slot it in. This guide shows the Stage protocol, the hard rules every stage must follow, and how to manage the stage list on a DirectoryPipeline.

A stage is any object that has a name and a run method. There is no base class to inherit from — indx uses structural typing, so anything that fits the shape “drops straight in”.

from typing import Protocol, runtime_checkable
from indx import SpaceContext
@runtime_checkable
class Stage(Protocol):
name: str # stable identifier, e.g. "pii-redact"
def run(self, ctx: SpaceContext) -> SpaceContext: ...

Two members, both mandatory:

MemberTypePurpose
namestrStable identifier, used to address the stage in replace() / drop() and in progress output. Use a lowercase, hyphenated value (e.g. pii-redact).
run(ctx: SpaceContext) -> SpaceContextDoes the work and returns the context.

The pipeline executes stages in registration order, passing the context out of each stage straight into the next. Whatever earlier stages produced is already on ctx when your stage runs — the directory graph from Walk, ctx.parsed from Parse, ctx.chunks from Chunk, ctx.relations from Relate, enriched metadata from Enrich, and ctx.embeddings from Embed+Pack.

A redaction pass is the canonical custom stage. Run it after Chunk so chunk text exists, and before any egress-capable component (an external enrichment LLM) can see the raw content — redaction is a first-class privacy extension point in indx.

from indx import SpaceContext
class PiiRedactStage:
name = "pii-redact"
def run(self, ctx: SpaceContext) -> SpaceContext:
for chunk in ctx.chunks:
chunk.text = redact(chunk.text)
return ctx # MUST return the same context

Insert it at index 3 — it lands after chunk (index 2) and before relate (which shifts from index 3 to 4):

pipeline.insert(3, PiiRedactStage()) # run after Chunk, before Relate

The default stage order is 0:walk, 1:parse, 2:chunk, 3:relate, 4:enrich, 5:embed-pack. Inserting at index 3 places your stage between Chunk and Relate, so by the time the external LLM in Enrich runs, every chunk’s text is already redacted.

Custom stages obey the same contract as the built-ins. These are enforceable rules, not suggestions.

Read your inputs from the context and write your outputs back to it. No module-level globals, no side channels, no shared mutable singletons. The context is the single source of truth for everything the pipeline has accumulated.

# ❌ side channel — invisible to other stages, breaks resume & determinism
GLOBAL_CHUNKS.extend(new_chunks)
# ✅ everything flows through ctx
for chunk in self._dedup(ctx.chunks):
...

Re-running a stage on its own output must not corrupt state or duplicate work. Skip what’s already recorded on the context (or in the resume cache) rather than redoing it. This is what makes --resume cheap and keeps index.json byte-stable across reruns.

A per-item failure must be visible. You have two options per stage — pick one, but never use an empty except:

  • Raise a typed error (an IndxError subclass such as StageError) with an actionable message: what failed, on which item/stage, and what to do about it. This is the fail-loud path for fatal problems.
  • Record a non-fatal failure on ctx.errors and continue. Append a typed StageError so the item is skipped visibly rather than disappearing.
from indx.core.errors import StageError
def run(self, ctx: SpaceContext) -> SpaceContext:
for chunk in ctx.chunks:
try:
chunk.text = redact(chunk.text)
except RedactionError as exc:
ctx.errors.append(
StageError(
stage=self.name,
item=chunk.id,
kind="skip",
message=f"could not redact {chunk.id}",
detail=str(exc),
)
)
return ctx

StageError carries stage, item (e.g. the file path or chunk id), kind ("skip" or "fatal"), message, and an optional detail. Entries on ctx.errors surface on the resulting space under space.metadata["errors"]. Under --strict (or strict=True), every skip is promoted to fatal. See errors and exit codes for the full hierarchy.

Report progress through the shared reporting hook so the user sees per-stage output (Rich progress on the CLI, structured logs otherwise). Don’t run silently and don’t print() — library code logs and reports; it never prints.

A stage must function without assuming a specific neighbor implementation. Don’t depend on a particular parser, store, or upstream stage being present — depend on the shape of the context. A well-behaved stage can be inserted, reordered, or dropped without breaking the rest of the pipeline.

A fresh DirectoryPipeline registers the six built-ins in canonical order. Four methods let you reshape that list; each returns the pipeline for chaining.

MethodEffect
insert(index, stage)Insert a stage at a 0-based position.
append(stage)Add a stage to the end of the pipeline.
replace(name, stage)Swap out the stage with the given name.
drop(name)Remove the named stage entirely.
stages()Return the current ordered list of stages (handy for checking indices).
from indx import DirectoryPipeline
pipeline = DirectoryPipeline(embedder="bge-m3", store="qdrant")
# Inspect current order
for i, s in enumerate(pipeline.stages()):
print(i, s.name) # 0 walk, 1 parse, 2 chunk, 3 relate, 4 enrich, 5 embed-pack
# Add a redaction pass before Relate, drop LLM enrichment, then run
space = (
pipeline
.insert(3, PiiRedactStage()) # after Chunk, before Relate
.drop("enrich") # skip LLM work entirely
.run("./docs", "./ai-ready")
)

A few common patterns:

  • insert(3, …) — a redaction or normalization pass between Chunk and Relate.
  • append(MyExportStage()) — an extra export or notification step after the archive is sealed.
  • replace("relate", CustomRelationStage()) — your own relation resolver in place of the built-in.
  • drop("enrich") — skip LLM enrichment when no model is available or wanted. drop("embed-pack") yields a graph-only space with no vectors.

Inserting a stage in code is perfect for project-specific logic. To make a stage installable and resolvable by name anywhere a built-in is, register it as an entry-point plugin under the indx.stages group:

# pyproject.toml of your package
[project.entry-points."indx.stages"]
pii-redact = "my_pkg.stages:PiiRedactStage"

Once installed, indx discovers it at import and merges it into the registry. See authoring a plugin for the full packaging recipe, including extras, lazy imports, and contract tests.