REST Connect Filter
The REST Connect filter is an input filter for OpenFilter that provides HTTP REST API endpoints to receive data from external sources. It exposes FastAPI endpoints that can accept JSON payloads and binary data, then forwards the received data through the OpenFilter pipeline as Frame objects.
Overview
The REST Connect filter is designed to handle HTTP-based data ingestion scenarios where you need to:
- Accept data from web applications and external systems
- Provide RESTful API endpoints for data ingestion
- Support various HTTP methods (GET, POST, PUT, PATCH, DELETE)
- Load local files from a specified resource path
- Convert HTTP requests into OpenFilter Frame objects
- Handle JSON and binary data payloads
Key Features
- REST API Endpoints: FastAPI-based HTTP server with multiple endpoints
- JSON Payload Support: Direct JSON data ingestion
- Local File Loading: Serve files from local resource path
- Path Parameters: Dynamic endpoint routing with path parameters
- HTTP Methods: Support for GET, POST, PUT, PATCH, DELETE
- Data Conversion: Automatic conversion to Frame objects
- Resource Management: Local file serving capabilities
Configuration
Basic Configuration
from openfilter.filter_runtime.filter import Filter
from openfilter.filter_runtime.filters.rest import Rest
# Simple REST API server
Filter.run_multi([
# ... other filters above
(Rest, dict(
outputs='tcp://*:5550',
host='0.0.0.0',
port=8080,
)),
])
Advanced Configuration
# REST API with local resources
Filter.run_multi([
# ... other filters above
(Rest, dict(
outputs='tcp://*:5550',
host='0.0.0.0',
port=8080,
resource_path='/path/to/local/files',
methods=['GET', 'POST', 'PUT'],
endpoints=['/data', '/files/{filename}'],
)),
])
Environment Variables
You can configure via environment variables:
export FILTER_OUTPUTS="tcp://*:5550"
export FILTER_HOST="0.0.0.0"
export FILTER_PORT="8080"
export FILTER_RESOURCE_PATH="/path/to/files"
export FILTER_METHODS="GET,POST,PUT"
export FILTER_ENDPOINTS="/data,/files/{filename}"
HTTP Endpoints
Default Endpoints
The REST Connect filter provides several built-in endpoints:
1. Root Endpoint (/
)
- Method: GET
- Purpose: Health check and basic information
- Response: JSON with server status
2. Data Endpoint (/data
)
- Method: POST, PUT, PATCH
- Purpose: JSON data ingestion
- Content-Type:
application/json
- Body: JSON payload
3. Dynamic File Endpoint (/files/{filename}
)
- Method: GET, POST, PUT, PATCH, DELETE
- Purpose: File operations with path parameters
- Path Parameters:
filename
- the file to operate on
Custom Endpoints
You can define custom endpoints using the endpoints
configuration:
endpoints=[
'/api/v1/data',
'/api/v1/files/{filename}',
'/custom/endpoint/{id}',
]
HTTP Methods
Supported Methods
- GET: Retrieve data and files
- POST: Send data
- PUT: Update/replace data
- PATCH: Partial data updates
- DELETE: Remove data
Method Configuration
methods=['GET', 'POST', 'PUT', 'PATCH', 'DELETE']
Data Handling
JSON Payload Processing
The REST Connect filter converts JSON payloads into Frame objects:
# Incoming JSON
{
"detections": [
{"class": "person", "confidence": 0.95, "bbox": [100, 100, 200, 200]}
],
"timestamp": "2024-01-15T10:30:00Z",
"camera_id": "cam1"
}
# Converted to Frame object
frame = Frame()
frame.data = {
"detections": [...],
"timestamp": "...",
"camera_id": "cam1"
}
Local File Serving
Resource Path Configuration
The resource_path
parameter specifies where local files are served from:
resource_path='/path/to/local/files'
File Access
Files can be accessed via the /files/{filename}
endpoint:
GET /files/image1.jpg
GET /files/data/sensor_readings.json
GET /files/videos/sample.mp4
File Operations
Different HTTP methods provide different file operations:
- GET: Download/read file
- POST: Create file
- PUT: Replace file content
- PATCH: Update file metadata
- DELETE: Remove file
Usage Examples
Example 1: Basic REST API Server
Filter.run_multi([
# ... other filters above
(Rest, dict(
outputs='tcp://*:5550',
host='0.0.0.0',
port=8080,
)),
(ObjectDetection, dict(
sources='tcp://localhost:5550',
outputs='tcp://*:5552',
)),
(ImageOut, dict(
sources='tcp://localhost:5552',
outputs='file:///output/detected_{frame_number}.jpg',
)),
])
Behavior: Accepts HTTP requests, processes data through object detection, and saves results.
Example 2: Data Processing Pipeline
Filter.run_multi([
# ... other filters above
(Rest, dict(
outputs='tcp://*:5550',
host='0.0.0.0',
port=8080,
resource_path='/data',
endpoints=['/data', '/files/{filename}'],
)),
(DataProcessor, dict(
sources='tcp://localhost:5550',
outputs='tcp://*:5552',
)),
(OutputFilter, dict(
sources='tcp://localhost:5552',
outputs='file:///processed/data.json',
)),
])
Behavior: Accepts data requests, processes them, and saves processed results.
Example 3: JSON Data Ingestion
Filter.run_multi([
# ... other filters above
(Rest, dict(
outputs='tcp://*:5550',
host='0.0.0.0',
port=8080,
methods=['POST', 'PUT'],
endpoints=['/data', '/api/v1/sensors'],
)),
(DataProcessor, dict(
sources='tcp://localhost:5550',
outputs='tcp://*:5552',
)),
(Recorder, dict(
sources='tcp://localhost:5552',
outputs='file:///logs/sensor_data.jsonl',
rules=['+'],
)),
])
Behavior: Accepts JSON sensor data, processes it, and logs results.
Example 4: Multi-Endpoint API
Filter.run_multi([
# ... other filters above
(Rest, dict(
outputs='tcp://*:5550',
host='0.0.0.0',
port=8080,
resource_path='/resources',
endpoints=[
'/api/v1/data',
'/api/v1/files/{filename}',
'/health',
'/status',
],
)),
(Router, dict(
sources='tcp://localhost:5550',
outputs='tcp://*:5552',
routing_rules={
'data': 'tcp://*:5554',
'files': 'tcp://*:5555',
},
)),
])
Behavior: Provides multiple API endpoints with different processing paths.
Example 5: Webhook Integration
Filter.run_multi([
# ... other filters above
(Rest, dict(
outputs='tcp://*:5550',
host='0.0.0.0',
port=8080,
endpoints=['/webhook/{source}'],
)),
(WebhookProcessor, dict(
sources='tcp://localhost:5550',
outputs='tcp://*:5552',
)),
(NotificationSender, dict(
sources='tcp://localhost:5552',
outputs='tcp://*:5554',
)),
])
Behavior: Accepts webhook calls from external services and processes them.
Example 6: File Management API
Filter.run_multi([
# ... other filters above
(Rest, dict(
outputs='tcp://*:5550',
host='0.0.0.0',
port=8080,
resource_path='/files',
methods=['GET', 'POST', 'PUT', 'DELETE'],
endpoints=['/files/{filename}'],
)),
(FileProcessor, dict(
sources='tcp://localhost:5550',
outputs='tcp://*:5552',
)),
(FileManager, dict(
sources='tcp://localhost:5552',
outputs='tcp://*:5554',
)),
])
Behavior: Provides complete file management API with CRUD operations.
API Usage Examples
Sending JSON Data
Using curl
# Send JSON data
curl -X POST http://localhost:8080/data \
-H "Content-Type: application/json" \
-d '{
"detections": [
{"class": "person", "confidence": 0.95, "bbox": [100, 100, 200, 200]}
],
"timestamp": "2024-01-15T10:30:00Z",
"camera_id": "cam1"
}'
Using Python requests
import requests
import json
# Send JSON data
data = {
"detections": [
{"class": "person", "confidence": 0.95, "bbox": [100, 100, 200, 200]}
],
"timestamp": "2024-01-15T10:30:00Z",
"camera_id": "cam1"
}
response = requests.post('http://localhost:8080/data', json=data)
print(response.json())
File Operations
Download file
# Download a file
curl -X GET http://localhost:8080/files/data1.json -o downloaded_data.json
Create file
# Create a new file
curl -X POST http://localhost:8080/files/new_data.json \
-H "Content-Type: application/json" \
-d '{"data": "content"}'
Delete file
# Delete a file
curl -X DELETE http://localhost:8080/files/old_data.json
Error Handling
HTTP Status Codes
- 200 OK: Successful operation
- 400 Bad Request: Invalid request data
- 404 Not Found: Endpoint or file not found
- 405 Method Not Allowed: Unsupported HTTP method
- 413 Payload Too Large: File too large
- 500 Internal Server Error: Server processing error
Error Response Format
{
"error": "Error description",
"detail": "Detailed error information",
"status_code": 400
}
Common Error Scenarios
- Invalid JSON: Malformed JSON payload
- Missing Files: File not found in resource path
- Permission Errors: File access permission issues
- Large Data: Data exceeding size limits
- Unsupported Methods: HTTP method not allowed
- Missing Parameters: Required parameters not provided
Security Considerations
Input Validation
- Data Type Validation: Restrict allowed data types
- Size Limits: Limit data payload sizes
- Path Traversal: Prevent directory traversal attacks
- Content Validation: Validate received content
Access Control
- Authentication: Implement authentication mechanisms
- Authorization: Control access to endpoints
- Rate Limiting: Prevent abuse and DoS attacks
- CORS: Configure cross-origin resource sharing
Security Best Practices
# Example security configuration
Filter.run_multi([
# ... other filters above
(Rest, dict(
outputs='tcp://*:5550',
host='127.0.0.1', # Bind to localhost only
port=8080,
resource_path='/secure/data', # Restricted path
# Add authentication middleware
# Add rate limiting
# Add input validation
)),
])
Performance Considerations
Request Handling
- Async Processing: FastAPI provides async request handling
- Connection Pooling: Efficient HTTP connection management
- Memory Usage: Monitor memory usage for large data payloads
- Processing Time: Consider downstream processing time
File Operations
- Streaming: Stream large files instead of loading into memory
- Caching: Cache frequently accessed files
- Compression: Use compression for large responses
- CDN Integration: Use CDN for static file serving
Monitoring
- Request Metrics: Monitor request rates and response times
- Error Rates: Track HTTP error rates
- Data Operations: Monitor data processing rates
- Resource Usage: Track CPU and memory usage
Troubleshooting
Common Issues
Server Not Starting
- Check port availability
- Verify host binding
- Check firewall settings
- Validate configuration parameters
Data Processing Failures
- Check data size limits
- Verify data permissions
- Ensure sufficient disk space
- Validate data format support
JSON Processing Errors
- Validate JSON syntax
- Check content-type headers
- Verify data structure
- Handle encoding issues
Path Parameter Issues
- Validate endpoint patterns
- Check parameter extraction
- Verify file path construction
- Handle special characters
Debug Configuration
import logging
logging.basicConfig(level=logging.DEBUG)
# Enable REST Connect filter debugging
export DEBUG_REST=true
export LOG_LEVEL=DEBUG
Debug Information
- Request Logging: Log all incoming requests
- Response Logging: Log response details
- Data Operations: Log data access operations
- Error Details: Detailed error information
Advanced Usage
Custom Middleware
# Add custom middleware for authentication, logging, etc.
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
Custom Endpoints
# Define additional custom endpoints
@rest_app.post("/custom/endpoint")
async def custom_endpoint(data: dict):
# Custom processing logic
return {"status": "processed", "data": data}
Integration with External Systems
# Integrate with external APIs and services
Filter.run_multi([
# ... other filters above
(Rest, dict(
outputs='tcp://*:5550',
host='0.0.0.0',
port=8080,
)),
(ExternalAPIClient, dict(
sources='tcp://localhost:5550',
outputs='tcp://*:5552',
api_url='https://external-api.com',
api_key='your-api-key',
)),
(ResponseFormatter, dict(
sources='tcp://localhost:5552',
outputs='tcp://*:5554',
)),
])
API Reference
RestConfig
class RestConfig(FilterConfig):
outputs: str | list[str] | list[tuple[str, dict[str, Any]]]
host: str
port: int
resource_path: str | None
methods: list[str] | None
endpoints: list[str] | None
Rest
class Rest(Filter):
FILTER_TYPE = 'Input'
@classmethod
def normalize_config(cls, config)
def init(self, config)
def setup(self, config)
def shutdown(self)
def process(self, frames)
@staticmethod
def load_file(filename: str, resource_path: str | None) -> bytes
@staticmethod
def parse_multipart_data(content: bytes, content_type: str) -> tuple[dict[str, Any], dict[str, bytes]]
@staticmethod
def parse_json_data(content: bytes) -> dict[str, Any]
@staticmethod
def parse_form_data(content: bytes, content_type: str) -> dict[str, Any]
Environment Variables
DEBUG_REST
: Enable debug loggingFILTER_OUTPUTS
: Output destinationsFILTER_HOST
: Server host addressFILTER_PORT
: Server port numberFILTER_RESOURCE_PATH
: Local file resource pathFILTER_METHODS
: Allowed HTTP methodsFILTER_ENDPOINTS
: Custom endpoint patterns