🛡️ Rate limiting, user profiles, and anomaly logging for multi-tenant SaaS
| ← Back to README | Fast Start | User Isolation → |
The Security Policy module protects your pactown deployment against:
Each user has a profile with resource limits based on their tier:
| Tier | Concurrent Services | Memory | CPU | Requests/min | Services/hour |
|---|---|---|---|---|---|
FREE |
2 | 256MB | 25% | 20 | 5 |
BASIC |
5 | 512MB | 50% | 60 | 20 |
PRO |
10 | 2GB | 80% | 120 | 50 |
ENTERPRISE |
50 | 8GB | 100% | 500 | 200 |
from pactown import UserProfile, UserTier
# Create from tier (uses defaults)
profile = UserProfile.from_tier("user123", UserTier.PRO)
# Create with custom limits
profile = UserProfile(
user_id="user123",
tier=UserTier.BASIC,
max_concurrent_services=10, # Override default
max_memory_mb=1024,
max_cpu_percent=60,
max_requests_per_minute=100,
max_services_per_hour=30,
)
Token bucket algorithm with configurable rates:
from pactown import RateLimiter
limiter = RateLimiter(
requests_per_minute=60,
burst_size=10, # Allow short bursts
)
# Check and consume token
if limiter.consume("user123"):
# Request allowed
process_request()
else:
# Rate limited
wait_time = limiter.get_wait_time("user123")
return f"Rate limited. Retry in {wait_time:.1f}s"
Before starting a service, multiple checks are performed:
from pactown import SecurityPolicy, get_security_policy
policy = get_security_policy()
# Check if user can start a service
result = await policy.check_can_start_service(
user_id="user123",
service_id="my-api",
port=8001,
)
if result.allowed:
# Start the service
if result.delay_seconds > 0:
await asyncio.sleep(result.delay_seconds) # Throttling
start_service()
else:
print(f"Denied: {result.reason}")
Automatic throttling based on server load:
from pactown import ResourceMonitor
monitor = ResourceMonitor(
cpu_threshold=80.0, # Start throttling at 80% CPU
memory_threshold=85.0, # Start throttling at 85% memory
check_interval=5.0, # Check every 5 seconds
)
# Check if overloaded
is_overloaded, metrics = monitor.check_overload()
if is_overloaded:
delay = monitor.get_throttle_delay()
print(f"Server overloaded, delay: {delay}s")
All security events are logged for admin review:
from pactown import AnomalyType, AnomalyLogger
logger = AnomalyLogger(
log_path=Path("/var/log/pactown-anomalies.jsonl"),
max_events=10000,
on_anomaly=lambda e: alert_admin(e), # Optional callback
)
# Log an anomaly
event = logger.log(
anomaly_type=AnomalyType.RATE_LIMIT_EXCEEDED,
details="User exceeded rate limit",
user_id="user123",
service_id="my-api",
severity="medium",
)
# Query anomalies
recent = logger.get_recent(count=100)
user_anomalies = logger.get_by_user("user123")
rate_limits = logger.get_by_type(AnomalyType.RATE_LIMIT_EXCEEDED)
| Type | Description | Severity |
|---|---|---|
RATE_LIMIT_EXCEEDED |
Too many requests | medium |
CONCURRENT_LIMIT_EXCEEDED |
Too many active services | medium |
HOURLY_LIMIT_EXCEEDED |
Too many service starts | medium |
SERVER_OVERLOADED |
System under heavy load | low |
RAPID_RESTART |
Frequent restart attempts | medium |
UNAUTHORIZED_ACCESS |
Invalid credentials/blocked | high |
POST /runner/run
Content-Type: application/json
{
"project_id": 123,
"readme_content": "...",
"port": 10001,
"user_id": "user123",
"user_profile": {
"tier": "pro",
"max_concurrent_services": 10,
"max_memory_mb": 2048,
"max_cpu_percent": 80,
"max_requests_per_minute": 120,
"max_services_per_hour": 50
}
}
{
"success": false,
"message": "Concurrent service limit reached (2/2)",
"error_category": "permission",
"logs": [
"🔒 Security: Concurrent service limit reached (2/2)"
]
}
Get anomaly summary for monitoring:
policy = get_security_policy()
summary = policy.get_anomaly_summary(hours=24)
print(f"Total anomalies: {summary['total_count']}")
print(f"By type: {summary['by_type']}")
print(f"Top users: {summary['top_users']}")
{"timestamp": "2026-01-16T12:00:00", "type": "RATE_LIMIT_EXCEEDED", "user_id": "user123", "details": "Rate limit exceeded", "severity": "medium"}
{"timestamp": "2026-01-16T12:01:00", "type": "CONCURRENT_LIMIT_EXCEEDED", "user_id": "user456", "details": "Max 2 concurrent services", "severity": "medium"}
from pactown import SecurityPolicy, set_security_policy
policy = SecurityPolicy(
anomaly_log_path=Path("/var/log/pactown-anomalies.jsonl"),
default_rate_limit=60,
cpu_threshold=80.0,
memory_threshold=85.0,
on_anomaly=send_to_monitoring,
)
set_security_policy(policy)
from pactown import ServiceRunner, SecurityPolicy
custom_policy = SecurityPolicy(default_rate_limit=30)
runner = ServiceRunner(
sandbox_root="/tmp/sandboxes",
security_policy=custom_policy,
)
# Free tier - strict limits for trial users
free_profile = UserProfile.from_tier("trial_user", UserTier.FREE)
# Pro tier - reasonable limits for paying customers
pro_profile = UserProfile.from_tier("paying_user", UserTier.PRO)
def on_anomaly(event):
if event.severity == "high":
send_alert_to_slack(event)
log_to_prometheus(event)
policy = SecurityPolicy(on_anomaly=on_anomaly)
policy = get_security_policy()
profile = policy.get_user_profile("abuser123")
profile.is_blocked = True
profile.blocked_reason = "Repeated abuse"
policy.set_user_profile(profile)