OpenFilter Pipelines Controller
This operator provides a Kubernetes-native way to define and manage data processing Openfilter Pipelines through the Pipeline custom resource definition (CRD). It runs within a Kubernetes cluster and automatically reconciles Pipeline resources, implementing the declarative configuration model for defining, deploying, and managing filter-based data pipelines. The operator is built with controller-runtime and handles all aspects of Pipeline lifecycle management including creation, updates, status tracking, and deletion.
Repository: github.com/PlainsightAI/openfilter-pipelines-controller
OverviewDirect link to Overview
This guide gets you from zero to a working deployment of the OpenFilter Pipelines Controller, covers both execution modes (batch and stream), shows the required inputs for each mode, and walks through end‑to‑end demos using the manifests in this repository. It also includes kubectl validation steps we executed while writing this guide.
- Custom Resources:
Pipeline(recipe): defines input source and ordered filter containers.PipelineRun(execution): references aPipelineand executes it.
- Modes (Pipeline.spec.mode):
batch(default): object-storage batch processing via a single Kubernetes Job; work distributed with Valkey Streams.stream: real-time RTSP stream processing via a single-replica Kubernetes Deployment; optional Services expose ports from filters.
PrerequisitesDirect link to Prerequisites
The OpenFilter Pipelines Controller is compatible with any certified Kubernetes distribution, including K3s and SUSE Rancher.
- A Kubernetes cluster and
kubectlconfigured against it. - Container registry access if you build your own controller image.
- Valkey reachable from the controller (required for starting the controller; used by batch mode). You can deploy a development Valkey via
config/testing. - Optional object storage for batch pipelines (MinIO, AWS S3, GCS S3-compat, Azure blob S3-compat, etc.).
- Optional RTSP source for streaming mode (or use the demo RTSP server under
hack/rtsp-stream).
Versions (repo defaults):
- Go 1.25.1, controller-runtime v0.22.1, Kubebuilder v4.9.0.
Install the Controller (Helm)Direct link to Install the Controller (Helm)
The Helm chart is published to GitHub Pages for easy installation:
# Add the Helm repository
helm repo add openfilter-pipelines https://plainsightai.github.io/openfilter-pipelines-controller
helm repo update
# Install (includes in-cluster Valkey by default)
helm install openfilter-pipelines-controller openfilter-pipelines/openfilter-pipelines-controller \
--namespace openfilter-system --create-namespace
# Or disable bundled Valkey and use an external endpoint
helm install openfilter-pipelines-controller openfilter-pipelines/openfilter-pipelines-controller \
--namespace openfilter-system --create-namespace \
--set valkey.enabled=false \
--set controller.env[0].name=VALKEY_ADDR \
--set controller.env[0].value="<host:port>" \
--set controller.env[1].name=VALKEY_PASSWORD \
--set controller.env[1].value="<password>"
Namespaces and ServiceAccount layoutDirect link to Namespaces and ServiceAccount layout
- Batch mode: no special ServiceAccount is required for worker pods. The init “claimer” no longer patches pod annotations, and the controller infers ownership from Valkey using the consumer name (pod name). Jobs can run with the namespace’s default ServiceAccount.
- Stream mode: no special ServiceAccount is required. Streaming Deployments run with the namespace’s default ServiceAccount.
Modes and InputsDirect link to Modes and Inputs
These fields come from the CRD types in api/v1alpha1/ and the controller code under internal/controller/.
Batch mode (spec.mode: batch)Direct link to Batch mode (spec.mode: batch)
- Purpose: process files from object storage using one Kubernetes Job.
- Work queue: Valkey Streams (required).
- Required Pipeline inputs:
spec.source.bucket:name(string): bucket/container name.- Optional:
prefix(string),endpoint(URL; use for MinIO/GCS/Azure S3-compat),region,insecureSkipTLSVerify(bool),usePathStyle(bool). - Optional
credentialsSecret: Secret reference with keysaccessKeyIdandsecretAccessKey(S3-compatible credentials).
spec.filters[]: ordered container steps. Each filter supportsimage, optionalcommand,args,env,resources, andconfig(becomesFILTER_<NAME>=<value>envs).- Optional:
spec.videoInputPath(default/ws/input.mp4), path where the init “claimer” stages the file.
- PipelineRun controls:
spec.execution.parallelism(default 10)spec.execution.maxAttempts(default 3)spec.execution.pendingTimeout(default 15m; reclaim stale work)
Stream mode (spec.mode: stream)Direct link to Stream mode (spec.mode: stream)
- Purpose: process a live RTSP source via a single-replica Deployment.
- No Valkey queue; runs continuously (or until idle timeout fires).
- Required Pipeline inputs:
spec.source.rtsp:host(string),port(default 554),path(string)- Optional
credentialsSecret: Secret with keysusernameandpassword(controller injects_RTSP_USERNAME/_RTSP_PASSWORDand expandsRTSP_URL). - Optional
idleTimeout(duration): if the streaming pod remains Unready for this long, the controller marks the run complete and deletes the Deployment.
spec.filters[]: containers build the streaming pipeline; use$(RTSP_URL)in a filter config value to consume the injected URL.- Optional:
spec.services[]: expose specific ports from filters as Services. Each entry hasname(filter name),port, optionaltargetPortandprotocol. Services are named<pipelinerun-name>-<filter-name>-<index>.
- PipelineRun: minimal; only references the Pipeline.
QuickstartsDirect link to Quickstarts
Below demos assume the controller is installed via Helm. Replace <ns> with your Helm release namespace (for example, openfilter-system).
A. Batch mode demo (demo/pipeline_batch.yaml)Direct link to A. Batch mode demo (demo/pipeline_batch.yaml)
- Create object storage credentials Secret in
<ns>(example shows S3/MinIO style keys):
kubectl -n <ns> create secret generic gcs-credentials \
--from-literal=accessKeyId="<ACCESS_KEY>" \
--from-literal=secretAccessKey="<SECRET_KEY>"
- Review and update
demo/pipeline_batch.yamlwith your bucket details, then apply:
kubectl -n <ns> apply -f demo/pipeline_batch.yaml
- Start a run (uses
generateName):
kubectl -n <ns> create -f demo/pipelinerun_batch.yaml
- Observe progress:
# List PipelineRuns and watch status counts
kubectl -n <ns> get pipelineruns
kubectl -n <ns> describe pipelinerun <generated-name>
# List Job and pods created for the run
kubectl -n <ns> get job
kubectl -n <ns> get pods -l filter.plainsight.ai/pipelinerun=<generated-name>
# Tail a worker pod
kubectl -n <ns> logs -f <worker-pod-name>
- Cleanup:
kubectl -n <ns> delete pipelinerun <generated-name>
kubectl -n <ns> delete pipeline pipeline-batch
B. Stream mode demo with RTSP (demo/pipeline_rtsp.yaml)Direct link to B. Stream mode demo with RTSP (demo/pipeline_rtsp.yaml)
You need an RTSP endpoint. The repo includes a MediaMTX-based demo server under hack/rtsp-stream/.
- Deploy the RTSP demo server (defaults to a
rtsp-video-streamService on port 8554):
kubectl -n <ns> apply -f hack/rtsp-stream/ # deployment/service
kubectl -n <ns> get pods -w # wait Ready
- Apply the streaming Pipeline and start a run:
kubectl -n <ns> apply -f demo/pipeline_rtsp.yaml
kubectl -n <ns> apply -f demo/pipelinerun_rtsp.yaml
- Check Deployment status and optional Services:
# The controller creates a Deployment named <pipelinerun-name>-deploy
kubectl -n <ns> get deploy,pods -l pipelinerun=pipelinerun-rtsp
# If your Pipeline defines spec.services (e.g., webvis on 8080)
kubectl -n <ns> get svc
kubectl -n <ns> port-forward svc/pipelinerun-rtsp-webvis-0 8080:8080 # browse http://localhost:8080
- Cleanup:
kubectl -n <ns> delete pipelinerun pipelinerun-rtsp
kubectl -n <ns> delete pipeline pipeline-rtsp
kubectl -n <ns> delete -f hack/rtsp-stream/
Common kubectl snippetsDirect link to Common kubectl snippets
# Pipelines and runs
kubectl -n <ns> get pipelines
kubectl -n <ns> get pipelineruns
kubectl -n <ns> describe pipelinerun <name>
# Batch: list pods by run
kubectl -n <ns> get pods -l filter.plainsight.ai/pipelinerun=<run-name>
# Stream: deployment/pods created for a run
kubectl -n <ns> get deploy,pods -l pipelinerun=<run-name>
TroubleshootingDirect link to Troubleshooting
- Controller won’t start: set
VALKEY_ADDR(andVALKEY_PASSWORDif needed) on the controller Deployment or useconfig/testingoverlay. - Batch run stays pending: ensure the Valkey Service is reachable from controller/pods; confirm credentials Secret for object storage exists and has keys
accessKeyIdandsecretAccessKeyin the Pipeline’s namespace. - Workers no longer patch pods: the controller infers ownership via Valkey. No special ServiceAccount is required in workload namespaces.
- Streaming run never becomes Ready: verify RTSP
host:port/pathand credentials; check pod logs for connection errors.