Video Source Filter
The Video Source filter is an input filter for OpenFilter that reads video streams from various sources including files, webcams, and network streams (RTSP, HTTP, S3). It supports various video processing options like BGR/RGB conversion, synchronization modes, looping, FPS control, and image resizing. The filter uses the vidgear
library for robust video capture and processing.
Overview
The Video Source filter is designed to handle video input scenarios where you need to:
- Read video files from local storage or cloud storage (S3)
- Capture video from webcams and USB cameras
- Stream video from network sources (RTSP, HTTP)
- Control video playback parameters (FPS, looping, synchronization)
- Apply image transformations (resize, format conversion)
- Handle various video formats and codecs
- Process video streams in real-time or batch mode
Key Features
- Multiple Input Sources: Files, webcams, RTSP, HTTP, S3
- Video Format Support: Various codecs and containers
- Real-time Processing: Live video stream processing
- Synchronization Control: Frame synchronization options
- Looping Support: Continuous video playback
- FPS Control: Configurable frame rate output
- Image Resizing: Dynamic image size adjustment
- Format Conversion: BGR/RGB color space conversion
- Error Recovery: Robust error handling and recovery
Configuration
Basic Configuration
from openfilter.filter_runtime.filter import Filter
from openfilter.filter_runtime.filters.video_in import VideoIn
# Simple video file input
Filter.run_multi([
# ... other filters above
(VideoIn, dict(
sources='file:///path/to/video.mp4',
outputs='tcp://*:5550',
)),
])
Advanced Configuration with Multiple Options
# Video input with comprehensive options
Filter.run_multi([
# ... other filters above
(VideoIn, dict(
sources='file:///path/to/video.mp4',
outputs='tcp://*:5550',
bgr=True, # BGR color format
sync=True, # Enable synchronization
loop=True, # Loop video playback
fps=30, # Output 30 FPS
maxsize='1920x1080', # Maximum size
resize='800x600', # Resize to specific dimensions
)),
])
Environment Variables
You can configure via environment variables:
export FILTER_SOURCES="file:///path/to/video.mp4"
export FILTER_OUTPUTS="tcp://*:5550"
export FILTER_BGR="true"
export FILTER_SYNC="true"
export FILTER_LOOP="true"
export FILTER_FPS="30"
export FILTER_MAXSIZE="1920x1080"
export FILTER_RESIZE="800x600"
Input Sources
1. Local Video Files
File Path Format
sources='file:///path/to/video.mp4'
sources='file:///path/to/video.avi'
sources='file:///path/to/video.mov'
Supported Formats
- MP4: H.264, H.265, MPEG-4
- AVI: Various codecs
- MOV: QuickTime format
- MKV: Matroska format
- WMV: Windows Media Video
- FLV: Flash Video
- WebM: WebM format
File Examples
# Local file input
Filter.run_multi([
# ... other filters above
(VideoIn, dict(
sources='file:///home/user/videos/sample.mp4',
outputs='tcp://*:5550',
)),
])
2. Webcam and USB Camera
Camera URL Format
sources='webcam://0' # Default camera
sources='webcam://1' # Second camera
sources='webcam://2' # Third camera
Camera Examples
# Webcam input
Filter.run_multi([
# ... other filters above
(VideoIn, dict(
sources='webcam://0', # Default webcam
outputs='tcp://*:5550',
fps=30,
)),
])
# Multiple cameras
Filter.run_multi([
# ... other filters above
(VideoIn, dict(
sources='webcam://0', # Camera 1
outputs='tcp://*:5550',
)),
(VideoIn, dict(
sources='webcam://1', # Camera 2
outputs='tcp://*:5552',
)),
])
3. Network Streams (RTSP)
RTSP URL Format
sources='rtsp://username:password@ip:port/path'
sources='rtsp://192.168.1.100:554/stream1'
RTSP Examples
# IP camera stream
Filter.run_multi([
# ... other filters above
(VideoIn, dict(
sources='rtsp://admin:[email protected]:554/stream1',
outputs='tcp://*:5550',
sync=True, # Important for network streams
)),
])
# Multiple RTSP streams
Filter.run_multi([
# ... other filters above
(VideoIn, dict(
sources='rtsp://camera1.local/stream',
outputs='tcp://*:5550',
)),
(VideoIn, dict(
sources='rtsp://camera2.local/stream',
outputs='tcp://*:5552',
)),
])
4. HTTP Streams
HTTP URL Format
sources='http://server:port/stream'
sources='https://server:port/stream'
HTTP Examples
# HTTP video stream
Filter.run_multi([
# ... other filters above
(VideoIn, dict(
sources='http://192.168.1.100:8080/video',
outputs='tcp://*:5550',
)),
])
# HTTPS stream
Filter.run_multi([
# ... other filters above
(VideoIn, dict(
sources='https://streaming.server.com/live',
outputs='tcp://*:5550',
)),
])
5. Cloud Storage (S3)
S3 URL Format
sources='s3://bucket/path/to/video.mp4'
sources='s3://my-bucket/videos/sample.mp4'
S3 Examples
# S3 video file
Filter.run_multi([
# ... other filters above
(VideoIn, dict(
sources='s3://my-video-bucket/sample.mp4',
outputs='tcp://*:5550',
loop=True, # Loop for continuous processing
)),
])
Video Processing Options
Color Format (bgr
)
Controls the color format of output images:
bgr=True # BGR format (default for OpenCV)
bgr=False # RGB format (default for many ML frameworks)
Color Format Examples
# BGR format for OpenCV processing
Filter.run_multi([
# ... other filters above
(VideoIn, dict(
sources='file:///video.mp4',
outputs='tcp://*:5550',
bgr=True,
)),
(OpenCVProcessor, dict(
sources='tcp://localhost:5550',
outputs='tcp://*:5552',
)),
])
Synchronization (sync
)
Controls frame synchronization:
sync=True # Enable synchronization (recommended for network streams)
sync=False # Disable synchronization (faster processing)
Synchronization Use Cases
- Network Streams: Essential for RTSP/HTTP streams
- Real-time Processing: Maintains timing accuracy
- Multi-camera: Synchronizes multiple camera streams
- Performance: Can impact processing speed
Looping (loop
)
Controls video playback looping:
loop=True # Loop video playback
loop=False # Play once (default)
Looping Examples
# Continuous video processing
Filter.run_multi([
# ... other filters above
(VideoIn, dict(
sources='file:///short_video.mp4',
outputs='tcp://*:5550',
loop=True, # Loop for continuous processing
)),
(ObjectDetection, dict(
sources='tcp://localhost:5550',
outputs='tcp://*:5552',
)),
])
Frame Rate Control (fps
)
Controls output frame rate:
fps=30 # 30 FPS output
fps=60 # 60 FPS output
fps=None # Original video FPS (default)
FPS Control Examples
# Reduce frame rate for performance
Filter.run_multi([
# ... other filters above
(VideoIn, dict(
sources='file:///high_fps_video.mp4',
outputs='tcp://*:5550',
fps=15, # Reduce to 15 FPS
)),
(ImageProcessor, dict(
sources='tcp://localhost:5550',
outputs='tcp://*:5552',
)),
])
Image Resizing
Maximum Size (maxsize
)
Limits maximum image dimensions:
maxsize='1920x1080' # Maximum 1920x1080
maxsize='1280x720' # Maximum 1280x720
maxsize=None # No size limit (default)
Specific Resize (resize
)
Resizes to specific dimensions:
resize='800x600' # Resize to 800x600
resize='640x480' # Resize to 640x480
resize=None # No resize (default)
Resize Examples
# Resize for performance
Filter.run_multi([
# ... other filters above
(VideoIn, dict(
sources='file:///4k_video.mp4',
outputs='tcp://*:5550',
resize='1280x720', # Resize to HD
)),
(ObjectDetection, dict(
sources='tcp://localhost:5550',
outputs='tcp://*:5552',
)),
])
Usage Examples
Example 1: Basic Video File Processing
Filter.run_multi([
# ... other filters above
(VideoIn, dict(
sources='file:///path/to/video.mp4',
outputs='tcp://*:5550',
)),
(ObjectDetection, dict(
sources='tcp://localhost:5550',
outputs='tcp://*:5552',
)),
(ImageOut, dict(
sources='tcp://localhost:5552',
outputs='file:///output/detected_{frame_number}.jpg',
)),
])
Behavior: Reads video file, performs object detection, and saves results.
Example 2: Webcam with Real-time Processing
Filter.run_multi([
# ... other filters above
(VideoIn, dict(
sources='webcam://0', # Default webcam
outputs='tcp://*:5550',
fps=30,
sync=True,
)),
(FaceDetection, dict(
sources='tcp://localhost:5550',
outputs='tcp://*:5552',
)),
(Webvis, dict(
sources='tcp://localhost:5552',
outputs='tcp://*:5554',
port=8080,
)),
])
Behavior: Captures webcam feed, detects faces, and displays results in web browser.
Example 3: IP Camera Monitoring
Filter.run_multi([
# ... other filters above
(VideoIn, dict(
sources='rtsp://admin:[email protected]:554/stream1',
outputs='tcp://*:5550',
sync=True, # Important for network streams
maxsize='1920x1080',
)),
(MotionDetection, dict(
sources='tcp://localhost:5550',
outputs='tcp://*:5552',
)),
(AlertSystem, dict(
sources='tcp://localhost:5552',
outputs='tcp://*:5554',
)),
])
Behavior: Monitors IP camera for motion and sends alerts.
Example 4: Multi-Camera Setup
Filter.run_multi([
# ... other filters above
(VideoIn, dict(
sources='webcam://0', # Camera 1
outputs='tcp://*:5550',
fps=15,
)),
(VideoIn, dict(
sources='webcam://1', # Camera 2
outputs='tcp://*:5552',
fps=15,
)),
(VideoIn, dict(
sources='rtsp://camera3.local/stream',
outputs='tcp://*:5554',
sync=True,
)),
(MultiCameraProcessor, dict(
sources=['tcp://localhost:5550', 'tcp://localhost:5552', 'tcp://localhost:5554'],
outputs='tcp://*:5556',
)),
])
Behavior: Processes multiple camera feeds simultaneously.
Example 5: Video Analysis with Looping
Filter.run_multi([
# ... other filters above
(VideoIn, dict(
sources='file:///short_video.mp4',
outputs='tcp://*:5550',
loop=True, # Loop for continuous analysis
resize='640x480',
)),
(Util, dict(
sources='tcp://localhost:5550',
outputs='tcp://*:5552',
log=True, # Log frame data
)),
(ObjectDetection, dict(
sources='tcp://localhost:5552',
outputs='tcp://*:5554',
)),
(Recorder, dict(
sources='tcp://localhost:5554',
outputs='file:///analysis/detections.jsonl',
rules=['+main/data/detections'],
)),
])
Behavior: Continuously analyzes video and records detection results.
Example 6: S3 Video Processing
Filter.run_multi([
# ... other filters above
(VideoIn, dict(
sources='s3://my-video-bucket/sample.mp4',
outputs='tcp://*:5550',
loop=True, # Loop for continuous processing
maxsize='1280x720',
)),
(VideoProcessor, dict(
sources='tcp://localhost:5550',
outputs='tcp://*:5552',
)),
(VideoOut, dict(
sources='tcp://localhost:5552',
outputs='s3://processed-bucket/processed_sample.mp4',
)),
])
Behavior: Processes video from S3 and saves results back to S3.
Performance Considerations
Video Processing Performance
- Resolution Impact: Higher resolution requires more processing power
- Frame Rate: Higher FPS increases processing load
- Codec Complexity: Different codecs have different decoding costs
- Network Latency: RTSP/HTTP streams add network overhead
Memory Usage
- Frame Buffering: Video frames consume significant memory
- Resize Operations: Image resizing requires additional memory
- Multiple Streams: Each stream consumes memory independently
- Looping: Looping can accumulate memory over time
Optimization Strategies
# Optimize for performance
Filter.run_multi([
# ... other filters above
(VideoIn, dict(
sources='file:///video.mp4',
outputs='tcp://*:5550',
resize='640x480', # Smaller images
fps=15, # Lower frame rate
sync=False, # Disable sync for speed
)),
])
Error Handling
Common Error Scenarios
- File Not Found: Video file doesn't exist
- Invalid Format: Unsupported video format
- Camera Access: Camera not available or in use
- Network Issues: RTSP/HTTP connection failures
- Codec Issues: Unsupported video codecs
- Permission Errors: File or camera access denied
Error Recovery
- Automatic Retry: Retries failed connections
- Graceful Degradation: Continues with available sources
- Error Logging: Logs errors for debugging
- Resource Cleanup: Proper cleanup on failures
Error Examples
# File not found
sources='file:///nonexistent.mp4' # Error: File not found
# Invalid camera index
sources='webcam://999' # Error: Camera not available
# Invalid RTSP URL
sources='rtsp://invalid.url/stream' # Error: Connection failed
Debugging and Monitoring
Debug Configuration
import logging
logging.basicConfig(level=logging.DEBUG)
# Enable video input debugging
export DEBUG_VIDEO_IN=true
export LOG_LEVEL=DEBUG
Debug Information
- Source Status: Shows source connection status
- Frame Information: Logs frame details and timing
- Error Details: Detailed error information
- Performance Metrics: Processing times and rates
Monitoring
- Frame Rates: Track actual vs. target frame rates
- Processing Times: Monitor frame processing duration
- Error Rates: Track connection and processing errors
- Resource Usage: Monitor memory and CPU usage
Troubleshooting
Common Issues
Video File Issues
- Check file format support
- Verify file permissions
- Ensure codec availability
- Check file corruption
Camera Issues
- Verify camera availability
- Check camera permissions
- Ensure camera not in use
- Test camera with other software
Network Stream Issues
- Check network connectivity
- Verify RTSP/HTTP URLs
- Check authentication credentials
- Monitor network latency
Performance Issues
- Reduce resolution and frame rate
- Check system resources
- Optimize processing pipeline
- Consider hardware acceleration
Debug Configuration
# Enable comprehensive debugging
Filter.run_multi([
# ... other filters above
(VideoIn, dict(
sources='file:///video.mp4',
outputs='tcp://*:5550',
sync=True, # Enable sync for debugging
fps=10, # Lower FPS for monitoring
)),
(Util, dict(
sources='tcp://localhost:5550',
outputs='tcp://*:5552',
log=True, # Log frame data
)),
])
Advanced Usage
Custom Video Processing
# Custom video processing pipeline
Filter.run_multi([
# ... other filters above
(VideoIn, dict(
sources='file:///video.mp4',
outputs='tcp://*:5550',
resize='1280x720',
fps=30,
)),
(CustomProcessor, dict(
sources='tcp://localhost:5550',
outputs='tcp://*:5552',
)),
])
Multi-Format Support
# Support multiple video formats
sources = [
'file:///video1.mp4',
'file:///video2.avi',
'file:///video3.mov',
]
Filter.run_multi([
# ... other filters above
(VideoIn, dict(
sources=sources,
outputs='tcp://*:5550',
loop=True,
)),
])
Dynamic Configuration
# Dynamic configuration based on source type
def get_video_config(source):
if source.startswith('rtsp://'):
return {'sync': True, 'fps': 15}
elif source.startswith('file://'):
return {'loop': True, 'fps': 30}
else:
return {'fps': 30}
# Apply configuration
config = get_video_config('rtsp://camera.local/stream')
Filter.run_multi([
# ... other filters above
(VideoIn, dict(
sources='rtsp://camera.local/stream',
outputs='tcp://*:5550',
**config
)),
])
API Reference
VideoInConfig
class VideoInConfig(FilterConfig):
sources: str | list[str] | list[tuple[str, dict[str, Any]]]
outputs: str | list[str] | list[tuple[str, dict[str, Any]]]
bgr: bool | None
sync: bool | None
loop: bool | None
fps: int | None
maxsize: str | None
resize: str | None
VideoIn
class VideoIn(Filter):
FILTER_TYPE = 'Input'
@classmethod
def normalize_config(cls, config)
def init(self, config)
def setup(self, config)
def shutdown(self)
def process(self, frames)
def read_video(self, source)
def process_frame(self, frame, source_config)
def apply_transforms(self, image, config)
def resize_image(self, image, size)
def convert_color(self, image, bgr)
Environment Variables
DEBUG_VIDEO_IN
: Enable debug loggingFILTER_SOURCES
: Video input sourcesFILTER_OUTPUTS
: Output destinationsFILTER_BGR
: Color format (BGR/RGB)FILTER_SYNC
: Enable synchronizationFILTER_LOOP
: Enable video loopingFILTER_FPS
: Output frame rateFILTER_MAXSIZE
: Maximum image sizeFILTER_RESIZE
: Image resize dimensions