Writing a Custom Stage
A stage is a single, replaceable unit of work in the pipeline. When the six built-ins — walk → parse → chunk → relate → enrich → embed-pack — don’t cover something you need (a redaction pass before enrichment, a custom dedup step, a domain-specific relation resolver), you write your own stage and slot it in. This guide shows the Stage protocol, the hard rules every stage must follow, and how to manage the stage list on a DirectoryPipeline.
The Stage protocol
Section titled “The Stage protocol”A stage is any object that has a name and a run method. There is no base class to inherit from — indx uses structural typing, so anything that fits the shape “drops straight in”.
from typing import Protocol, runtime_checkablefrom indx import SpaceContext
@runtime_checkableclass Stage(Protocol): name: str # stable identifier, e.g. "pii-redact"
def run(self, ctx: SpaceContext) -> SpaceContext: ...Two members, both mandatory:
| Member | Type | Purpose |
|---|---|---|
name | str | Stable identifier, used to address the stage in replace() / drop() and in progress output. Use a lowercase, hyphenated value (e.g. pii-redact). |
run | (ctx: SpaceContext) -> SpaceContext | Does the work and returns the context. |
The pipeline executes stages in registration order, passing the context out of each stage straight into the next. Whatever earlier stages produced is already on ctx when your stage runs — the directory graph from Walk, ctx.parsed from Parse, ctx.chunks from Chunk, ctx.relations from Relate, enriched metadata from Enrich, and ctx.embeddings from Embed+Pack.
A worked example: PII redaction
Section titled “A worked example: PII redaction”A redaction pass is the canonical custom stage. Run it after Chunk so chunk text exists, and before any egress-capable component (an external enrichment LLM) can see the raw content — redaction is a first-class privacy extension point in indx.
from indx import SpaceContext
class PiiRedactStage: name = "pii-redact"
def run(self, ctx: SpaceContext) -> SpaceContext: for chunk in ctx.chunks: chunk.text = redact(chunk.text) return ctx # MUST return the same contextInsert it at index 3 — it lands after chunk (index 2) and before relate (which shifts from index 3 to 4):
pipeline.insert(3, PiiRedactStage()) # run after Chunk, before RelateThe default stage order is 0:walk, 1:parse, 2:chunk, 3:relate, 4:enrich, 5:embed-pack. Inserting at index 3 places your stage between Chunk and Relate, so by the time the external LLM in Enrich runs, every chunk’s text is already redacted.
Stage rules
Section titled “Stage rules”Custom stages obey the same contract as the built-ins. These are enforceable rules, not suggestions.
1. Communicate only through SpaceContext
Section titled “1. Communicate only through SpaceContext”Read your inputs from the context and write your outputs back to it. No module-level globals, no side channels, no shared mutable singletons. The context is the single source of truth for everything the pipeline has accumulated.
# ❌ side channel — invisible to other stages, breaks resume & determinismGLOBAL_CHUNKS.extend(new_chunks)
# ✅ everything flows through ctxfor chunk in self._dedup(ctx.chunks): ...2. Be idempotent and resume-aware
Section titled “2. Be idempotent and resume-aware”Re-running a stage on its own output must not corrupt state or duplicate work. Skip what’s already recorded on the context (or in the resume cache) rather than redoing it. This is what makes --resume cheap and keeps index.json byte-stable across reruns.
3. Never silently swallow errors
Section titled “3. Never silently swallow errors”A per-item failure must be visible. You have two options per stage — pick one, but never use an empty except:
- Raise a typed error (an
IndxErrorsubclass such asStageError) with an actionable message: what failed, on which item/stage, and what to do about it. This is the fail-loud path for fatal problems. - Record a non-fatal failure on
ctx.errorsand continue. Append a typedStageErrorso the item is skipped visibly rather than disappearing.
from indx.core.errors import StageError
def run(self, ctx: SpaceContext) -> SpaceContext: for chunk in ctx.chunks: try: chunk.text = redact(chunk.text) except RedactionError as exc: ctx.errors.append( StageError( stage=self.name, item=chunk.id, kind="skip", message=f"could not redact {chunk.id}", detail=str(exc), ) ) return ctxStageError carries stage, item (e.g. the file path or chunk id), kind ("skip" or "fatal"), message, and an optional detail. Entries on ctx.errors surface on the resulting space under space.metadata["errors"]. Under --strict (or strict=True), every skip is promoted to fatal. See errors and exit codes for the full hierarchy.
4. Emit progress
Section titled “4. Emit progress”Report progress through the shared reporting hook so the user sees per-stage output (Rich progress on the CLI, structured logs otherwise). Don’t run silently and don’t print() — library code logs and reports; it never prints.
5. Be replaceable and optional
Section titled “5. Be replaceable and optional”A stage must function without assuming a specific neighbor implementation. Don’t depend on a particular parser, store, or upstream stage being present — depend on the shape of the context. A well-behaved stage can be inserted, reordered, or dropped without breaking the rest of the pipeline.
Managing the stage list
Section titled “Managing the stage list”A fresh DirectoryPipeline registers the six built-ins in canonical order. Four methods let you reshape that list; each returns the pipeline for chaining.
| Method | Effect |
|---|---|
insert(index, stage) | Insert a stage at a 0-based position. |
append(stage) | Add a stage to the end of the pipeline. |
replace(name, stage) | Swap out the stage with the given name. |
drop(name) | Remove the named stage entirely. |
stages() | Return the current ordered list of stages (handy for checking indices). |
from indx import DirectoryPipeline
pipeline = DirectoryPipeline(embedder="bge-m3", store="qdrant")
# Inspect current orderfor i, s in enumerate(pipeline.stages()): print(i, s.name) # 0 walk, 1 parse, 2 chunk, 3 relate, 4 enrich, 5 embed-pack
# Add a redaction pass before Relate, drop LLM enrichment, then runspace = ( pipeline .insert(3, PiiRedactStage()) # after Chunk, before Relate .drop("enrich") # skip LLM work entirely .run("./docs", "./ai-ready"))A few common patterns:
insert(3, …)— a redaction or normalization pass between Chunk and Relate.append(MyExportStage())— an extra export or notification step after the archive is sealed.replace("relate", CustomRelationStage())— your own relation resolver in place of the built-in.drop("enrich")— skip LLM enrichment when no model is available or wanted.drop("embed-pack")yields a graph-only space with no vectors.
Packaging a stage for reuse
Section titled “Packaging a stage for reuse”Inserting a stage in code is perfect for project-specific logic. To make a stage installable and resolvable by name anywhere a built-in is, register it as an entry-point plugin under the indx.stages group:
# pyproject.toml of your package[project.entry-points."indx.stages"]pii-redact = "my_pkg.stages:PiiRedactStage"Once installed, indx discovers it at import and merges it into the registry. See authoring a plugin for the full packaging recipe, including extras, lazy imports, and contract tests.
See also
Section titled “See also”- Pipeline and stages — how the shared context flows through the six stages.
- Protocols reference — the full
Stageprotocol and component protocols. - Authoring a plugin — package and distribute stages and components by name.