Environment API Implementation Guide¶
🎯 Overview¶
This document provides comprehensive guidance on implementing environment-related APIs in the MBPanel system, covering: - How environment data is accessed from owner accounts - How invited team members view environments - Permission-based access control for environments - Job system integration for async operations - Sync architecture and staleness management
📐 Architecture Context¶
Team-Centric Model¶
MBPanel uses a Team-Centric Architecture where: - Owner: The user who created the team and owns the Virtuozzo account - Members: Invited users who access resources through team membership - Environments: Infrastructure resources owned by the team (via Virtuozzo)
graph TD
Owner[Owner User] -->|Creates| Team[Team Entity]
Team -->|Stores| VZCreds[VZ Credentials Encrypted]
VZCreds -->|Authenticates| Virtuozzo[Virtuozzo Infrastructure]
Virtuozzo -->|Manages| Envs[Environments]
Member1[Member User] -->|Joins| Team
Member2[Member User] -->|Joins| Team
Member1 -->|Reads via| Cache[Environment Cache]
Member2 -->|Reads via| Cache
Owner -->|Reads via| Cache
Cache -->|Syncs from| Virtuozzo
Team -->|Controls Access| RBAC[RBAC System]
RBAC -->|Enforces| Permissions[Permissions]
Key Principles¶
- Single Source of Truth: Virtuozzo is the authoritative source for environment data
- Backend Proxy Pattern: Frontend never calls Virtuozzo directly; backend acts as secure proxy
- Cache-Aside with Sync: Postgres caches Virtuozzo data with explicit freshness tracking
- Permission-Based Access: All operations are gated by role-based permissions
- Job-Based Async: Long-running operations (create, modify) use job system
🔐 Access Control & Permissions¶
Owner Account Access¶
The owner is the team member who:
- Registered with Virtuozzo credentials OR created a new Virtuozzo account
- Has their Virtuozzo session key stored encrypted in teams.session_key_encrypted
- Has their Virtuozzo UID stored in teams.vz_uid
- Automatically gets the "Owner" role with * (wildcard) permission
Critical Implementation Notes:
- Owner credentials are encrypted at rest using Fernet encryption (see app/core/crypto.py)
- Session keys are refreshed and re-encrypted on owner login
- Only the owner's session key is used for Virtuozzo API calls (per Virtuozzo multi-user model)
Member Access¶
Invited members access environments through:
1. Team Membership: Link in team_members table
2. Role Assignment: Each member has a role (Owner, Manager, Developer, or custom)
3. Permission Inheritance: Permissions are derived from their role
Key Difference from Owner: - Members do NOT have personal Virtuozzo credentials - Members read from the same cached environment data as the owner - Members' actions are proxied through the backend using the owner's Virtuozzo session
Permission Matrix¶
| Operation | Required Permission | Owner | Manager | Developer |
|---|---|---|---|---|
| List environments | (team membership) | ✅ | ✅ | ✅ |
| View environment details | (team membership) | ✅ | ✅ | ✅ |
| Create environment | server.create |
✅ | ✅ | ✅ |
| Restart environment | server.restart |
✅ | ✅ | ✅ |
| Delete environment | server.delete |
✅ | ✅ | ❌ |
| Modify team billing | billing.edit |
✅ | ✅ | ❌ |
| Invite members | team.invite |
✅ | ✅ | ❌ |
| Manage roles | team.manage |
✅ | ❌ | ❌ |
Note: The permission matrix can be customized per team using the RBAC system. The above reflects the default role permissions seeded during team creation.
🛠️ API Endpoint Implementation¶
Virtuozzo API Integration¶
Before implementing our backend endpoints, understand the underlying Virtuozzo API:
Virtuozzo GetEnvs Endpoint:
Parameters:
- session (required): User session key or personal access token
- lazy (optional): true = load only basic metadata, false = full environment info
- appid (optional): Unique application identifier (for authentication)
- ownerUid (optional): Target user account UID
Example Call:
# backend/app/infrastructure/external/virtuozzo/service.py
async def fetch_environments_and_nodes(
session_key: str,
lazy: bool = False,
owner_uid: int | None = None
) -> dict:
"""
Call Virtuozzo GetEnvs API.
Returns nested structure with environments, nodes, and metadata.
"""
params = {"session": session_key}
if lazy:
params["lazy"] = "true"
if owner_uid:
params["ownerUid"] = owner_uid
async with create_external_async_client() as client:
response = await client.get(
f"{settings.virtuozzo_base_url}/environment/control/rest/getenvs",
params=params,
timeout=30.0
)
response.raise_for_status()
return response.json()
Virtuozzo Response Structure:
{
"result": 0,
"error": null,
"infos": [
{
"env": {
"envName": "myapp-prod",
"displayName": "Production App",
"shortdomain": "myapp",
"status": 1,
"uid": 12345,
"creatorUid": 12345,
"appid": "wp-cluster",
"domain": "myapp.jelastic.cloud",
"extDomains": [{"extdomains": "www.example.com"}],
"isFirewallEnabled": true,
"createdOn": "2026-01-10 14:30:00",
// ... many more fields
},
"envGroups": ["production"],
"nodes": [
{
"id": 98765,
"name": "cp-node",
"nodeGroup": "cp",
"status": 1,
"fixedCloudlets": 4,
"flexibleCloudlets": 8,
"address": "192.168.1.10",
"intIP": "10.0.0.5",
"extIps": [{"extIPs": "203.0.113.25"}],
// ... many more fields
}
]
}
]
}
Response Field Mapping:
| Virtuozzo Field | Type | Maps To | Notes |
|---|---|---|---|
result |
int | - | 0 = success, non-zero = error |
error |
string | - | Error message if result != 0 |
infos |
array | Environments list | Array of environment objects |
infos[].env |
object | Environment model |
Core environment data |
infos[].envGroups |
array | Environment.envGroups |
Tags like ["staging"], ["production"] |
infos[].nodes |
array | Node model |
Array of node objects |
1. List Environments¶
Endpoint: GET /api/v1/environments
Purpose: Retrieve all environments for the current team from cached data.
Implementation:
# backend/app/api/v1/environments.py
@router.get("/", summary="List environments for the current team")
async def list_environments(
db: Annotated[AsyncSession, Depends(db_session)],
current_user: Annotated[AuthenticatedUser, Depends(get_current_user)],
) -> dict[str, list]:
items = await service.list_environments(db=db, current_user=current_user)
return {"items": items}
Service Layer:
# backend/app/domains/environments/service.py
async def list_environments(
*,
db: AsyncSession,
current_user: AuthenticatedUser
) -> list[dict[str, Any]]:
"""
List all environments for the current user's team.
Data Flow:
1. Validate team context exists
2. Query cached environments from Postgres (NOT Virtuozzo API)
3. Serialize with sync metadata
4. Return to frontend
Note: This endpoint reads from cache. To refresh data, trigger a sync job.
"""
if current_user.team_id is None:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Missing team context."
)
rows = await repository.list_team_environments(
db=db,
team_id=int(current_user.team_id)
)
return [_serialize_environment(env) for env in rows]
Key Implementation Points:
- Team Context Validation: Always verify
current_user.team_idis present - No Direct VZ Calls: Read from Postgres cache (
environmentstable), NOT Virtuozzo API - Automatic Team Scoping: Repository filters by
team_idautomatically - Freshness Metadata: Include sync status in response (see Sync section below)
- Sync vs List:
- List endpoint = Fast read from cache
- Sync job = Calls Virtuozzo
GetEnvsAPI and updates cache
Response Schema:
{
"items": [
{
"name": "myapp-prod",
"display_name": "Production App",
"status": "RUNNING",
"created_at": "2026-01-10T14:30:00Z",
"cloudlets_baseline": 4,
"cloudlets_max": 8,
"sync_status": {
"freshness": "current",
"last_synced_at": "2026-01-15T10:00:00Z"
}
}
]
}
2. Create Environment (Async Job)¶
Endpoint: POST /api/v1/environments
Purpose: Provision a new environment (asynchronous operation using job system).
Implementation:
# backend/app/api/v1/environments.py
@router.post(
"/",
response_model=EnvironmentCreateResponse,
status_code=status.HTTP_202_ACCEPTED,
summary="Provision a new environment (async job)",
)
async def create_environment(
request: EnvironmentCreateRequest,
db: Annotated[AsyncSession, Depends(db_session)],
current_user: Annotated[
AuthenticatedUser,
Depends(require_permission("server.create"))
],
) -> EnvironmentCreateResponse:
return await service.enqueue_environment_creation(
db=db,
current_user=current_user,
payload=request
)
Permission Enforcement:
- Uses require_permission("server.create") dependency
- Automatically checks if user's role includes this permission
- Returns 403 if permission is missing
Service Implementation:
async def enqueue_environment_creation(
*,
db: AsyncSession,
current_user: AuthenticatedUser,
payload: EnvironmentCreateRequest,
) -> EnvironmentCreateResponse:
# 1. Validate team context
if current_user.team_id is None:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Missing team context."
)
# 2. Load team and validate state
team = await db.get(Team, int(current_user.team_id))
if not team:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Team not found."
)
# 3. Validate team has required Virtuozzo credentials
if not team.session_key_encrypted:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Team missing Virtuozzo session key. Complete onboarding first.",
)
if not team.vz_uid:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Team missing Virtuozzo UID. Complete onboarding first.",
)
# 4. Validate team subscription status
if team.status not in {"TRIALING", "ACTIVE"}:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Team subscription is not active.",
)
# 5. Check for naming conflicts
shortdomain = payload.shortdomain.lower()
if await repository.shortdomain_exists(
db=db,
team_id=team.id,
shortdomain=shortdomain
):
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Shortdomain already exists."
)
# 6. Check for in-flight jobs for same shortdomain
if await repository.has_active_job_for_shortdomain(
db=db,
team_id=team.id,
shortdomain=shortdomain
):
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Provisioning already in progress for this name.",
)
# 7. Create job record and enqueue
job_id = uuid.uuid4()
job_payload = CreateEnvironmentJobPayload(
job_id=job_id,
team_id=team.id,
user_id=current_user.user_id,
display_name=payload.display_name,
shortdomain=shortdomain,
owner_uid=team.vz_uid,
session_key_encrypted=team.session_key_encrypted,
env=_build_env_spec(payload),
nodes=_build_nodes_spec(payload),
)
# 8. Record job log
await _insert_job_log(db, job_id, team.id, current_user.user_id, ...)
# 9. Publish status event (SSE notification)
await _publish_job_status_event(
team_id=team.id,
job_id=job_id,
status=EnvironmentJobStatus.QUEUED,
message="Provisioning request queued.",
env_name=shortdomain,
)
# 10. Dispatch to job queue
_dispatch_job(job_payload)
return EnvironmentCreateResponse(
job_id=job_id,
status=EnvironmentJobStatus.QUEUED
)
Response (202 Accepted):
Job Status Tracking:
Clients can track job progress via:
1. SSE Events: Subscribe to /api/v1/events?job_id={job_id}
2. Polling: Query job status (future endpoint)
Example SSE event stream:
event: job.status
data: {"job_id": "...", "status": "queued", "message": "Job queued"}
event: job.status
data: {"job_id": "...", "status": "started", "message": "Calling Virtuozzo API"}
event: job.status
data: {"job_id": "...", "status": "polling", "message": "Waiting for environment activation"}
event: job.status
data: {"job_id": "...", "status": "completed", "env_name": "myapp-prod", "app_id": "123"}
🔄 Data Mapping & Sync Patterns¶
Virtuozzo API Response Structure¶
When calling the Virtuozzo API, you receive a nested structure:
{
"data": {
"infos": [
{
"env": {
"displayName": "Production App",
"envName": "myapp-prod",
"shortdomain": "myapp",
"status": "RUNNING",
"uid": 12345,
"creatorUid": 12345,
"ownerUid": 12345,
"appid": "wp-cluster-123",
"domain": "myapp.jelastic.cloud",
"custdomain": "myapp.example.com",
"extdomains": ["www.example.com", "example.com"],
"createdOn": "2026-01-10 14:30:00",
"isFirewallEnabled": true,
"hostGroup": {
"uniqueName": "vz-eu-west-1",
"displayName": "EU West 1"
},
"contexts": [
{
"archivename": "myapp-backup-20260110"
}
]
},
"envGroups": ["production"],
"nodes": [
{
"id": 98765,
"name": "cp-node",
"nodeGroup": "cp",
"status": "RUNNING",
"fixedCloudlets": 4,
"flexibleCloudlets": 8,
"diskLimit": 10240,
"bandwidthLimit": 1024,
// ... many more node fields
}
]
}
]
}
}
Environment Model Mapping¶
Map the Virtuozzo API response to your Environment model:
| Database Column | Source | Notes |
|---|---|---|
team_id |
Job context | CRITICAL: Preserve existing team_id for existing records |
envName |
env.envName |
Unique key - use for upsert matching |
displayName |
env.displayName |
User-friendly name |
shortdomain |
env.shortdomain |
Subdomain identifier |
status |
env.status |
RUNNING, STOPPED, etc. |
uid |
env.uid |
Virtuozzo user ID |
creatorUid |
env.creatorUid |
Who created the environment |
ownerUid |
env.ownerUid |
Current owner UID |
appid |
env.appid |
Application template ID (may be string or int) |
domain |
env.domain |
Default Jelastic domain |
custdomain |
env.custdomain |
Custom domain if set |
extdomains |
env.extdomains |
JSON array - external domains |
createdOn |
env.createdOn |
ISO timestamp |
isFirewallEnabled |
env.isFirewallEnabled |
Boolean |
hostGroup_uniqueName |
env.hostGroup.uniqueName |
Region identifier |
hostGroup_displayName |
env.hostGroup.displayName |
Region display name |
archivename |
env.contexts[0].archivename |
Backup archive name (optional) |
envGroups |
envGroups array |
JSON array - ["staging"], ["production"] |
parent_env_id |
Derived | For staging envs, link to production parent |
session_key_encrypted |
Job context | Owner's encrypted VZ session |
synced_at |
Current timestamp | Track sync freshness |
source_version |
Hash of API response | Detect changes |
Key Mapping Logic:
def map_environment_from_api(
env_data: dict,
env_groups: list[str],
team_id: int,
session_key_encrypted: str
) -> dict:
"""Map Virtuozzo API data to Environment model."""
return {
"team_id": team_id,
"session_key_encrypted": session_key_encrypted,
"envName": env_data.get("envName"),
"displayName": env_data.get("displayName"),
"shortdomain": env_data.get("shortdomain"),
"status": env_data.get("status"),
"uid": env_data.get("uid"),
"creatorUid": env_data.get("creatorUid"),
"ownerUid": env_data.get("ownerUid"),
"appid": env_data.get("appid"),
"domain": env_data.get("domain"),
"custdomain": env_data.get("custdomain"),
"extdomains": json.dumps(env_data.get("extdomains", [])),
"createdOn": env_data.get("createdOn"),
"isFirewallEnabled": env_data.get("isFirewallEnabled"),
"hostGroup_uniqueName": env_data.get("hostGroup", {}).get("uniqueName"),
"hostGroup_displayName": env_data.get("hostGroup", {}).get("displayName"),
"archivename": env_data.get("contexts", [{}])[0].get("archivename"),
"envGroups": json.dumps(env_groups) if env_groups else None,
"synced_at": datetime.now(timezone.utc),
# parent_env_id computed separately for staging environments
}
Node Model Mapping¶
Map node data to your Node model:
| Database Column | Source | Notes |
|---|---|---|
environment_id |
Computed | FK to environments.id |
node_id |
nodes[].id |
Unique key - Virtuozzo node ID |
name |
nodes[].name |
Node identifier (e.g., "cp-node") |
nodeGroup |
nodes[].nodeGroup |
Node type: cp, bl, sqldb |
status |
nodes[].status |
RUNNING, STOPPED |
fixedCloudlets |
nodes[].fixedCloudlets |
Baseline resources (1 cloudlet = 128MB RAM) |
flexibleCloudlets |
nodes[].flexibleCloudlets |
Scaling ceiling |
diskLimit |
nodes[].diskLimit |
MB |
diskIoLimit |
nodes[].diskIoLimit |
IOPS limit |
bandwidthLimit |
nodes[].bandwidthLimit |
Mbps |
engineType |
nodes[].engineType |
nginx-php-8.1, mysql-8.0 |
version |
nodes[].version |
Engine version |
url |
nodes[].url |
Node access URL |
adminUrl |
nodes[].adminUrl |
Admin panel URL |
address |
nodes[].address |
Public IP |
intIP |
nodes[].intIP |
Internal IP |
extIPs |
nodes[].extIPs |
JSON array - External IPs |
addons |
nodes[].addons |
JSON array - Installed addons |
customitem |
nodes[].customitem |
JSON object - Custom metadata |
Key Mapping Logic:
def map_node_from_api(node_data: dict, environment_id: int) -> dict:
"""Map Virtuozzo node data to Node model."""
return {
"environment_id": environment_id,
"node_id": node_data["id"],
"name": node_data.get("name"),
"nodeGroup": node_data.get("nodeGroup"),
"status": node_data.get("status"),
"fixedCloudlets": node_data.get("fixedCloudlets"),
"flexibleCloudlets": node_data.get("flexibleCloudlets"),
"diskLimit": node_data.get("diskLimit"),
"diskIoLimit": node_data.get("diskIoLimit"),
"bandwidthLimit": node_data.get("bandwidthLimit"),
"engineType": node_data.get("engineType"),
"version": node_data.get("version"),
"url": node_data.get("url"),
"adminUrl": node_data.get("adminUrl"),
"address": node_data.get("address"),
"intIP": node_data.get("intIP"),
"extIPs": json.dumps(node_data.get("extIPs", [])),
"addons": json.dumps(node_data.get("addons", [])),
"customitem": json.dumps(node_data.get("customitem", {})),
Virtuozzo Service Implementation
Centralize all Virtuozzo API calls in the infrastructure layer:
```python
# backend/app/infrastructure/external/virtuozzo/service.py
import httpx
from app.core.config import settings
from app.core.logging import get_logger
logger = get_logger(__name__)
class VirtuozzoService:
"""Client for Virtuozzo PaaS API."""
def __init__(self):
self.base_url = settings.virtuozzo_base_url
async def fetch_environments_and_nodes(
self,
session_key: str,
lazy: bool = False,
owner_uid: int | None = None
) -> dict:
"""
Fetch all environments and nodes for a user.
Calls: GET /environment/control/rest/getenvs
Args:
session_key: Decrypted Virtuozzo session key
lazy: If True, loads only basic metadata
owner_uid: Optional target user UID
Returns:
{
"result": 0,
"error": null,
"infos": [
{
"env": {...},
"envGroups": [...],
"nodes": [...]
}
]
}
Raises:
httpx.HTTPStatusError: If Virtuozzo API returns non-2xx
httpx.TimeoutException: If request times out
"""
params = {"session": session_key}
if lazy:
params["lazy"] = "true"
if owner_uid:
params["ownerUid"] = owner_uid
logger.info(
"calling_virtuozzo_getenvs",
lazy=lazy,
has_owner_uid=bool(owner_uid)
)
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(
f"{self.base_url}/environment/control/rest/getenvs",
params=params
)
response.raise_for_status()
data = response.json()
# Check Virtuozzo-level errors
if data.get("result") != 0:
error_msg = data.get("error", "Unknown Virtuozzo error")
logger.error(
"virtuozzo_api_error",
result=data.get("result"),
error=error_msg
)
raise VirtuozzoAPIError(f"Virtuozzo API error: {error_msg}")
infos = data.get("infos", [])
logger.info(
"virtuozzo_getenvs_success",
env_count=len(infos)
)
return data
class VirtuozzoAPIError(Exception):
"""Raised when Virtuozzo API returns an error result."""
pass
}¶
### RabbitMQ Job Integration
Use RabbitMQ for async environment operations (create, sync, modify):
**Job Queue Structure:**
```python
# backend/app/infrastructure/messaging/queues.py
class JobQueues:
"""RabbitMQ queue definitions."""
# Environment sync jobs (priority queue)
ENVIRONMENTS_SYNC = "jobs.environments.sync"
# Environment creation jobs
ENVIRONMENTS_CREATE = "jobs.environments.create"
# Environment modification jobs
ENVIRONMENTS_MODIFY = "jobs.environments.modify"
# Routing keys for event publishing
class JobEvents:
"""Event routing keys for job status updates."""
@staticmethod
def sync_status(team_id: int) -> str:
return f"team.{team_id}.environments.sync.status"
@staticmethod
def create_status(team_id: int, job_id: str) -> str:
return f"team.{team_id}.environments.create.{job_id}.status"
Publishing Job to Queue:
# backend/app/domains/environments/service.py
async def trigger_environment_sync(
team_id: int,
user_id: int,
specific_env_name: str | None = None
) -> dict:
"""
Enqueue environment sync job to RabbitMQ.
Returns immediately with job_id for tracking.
"""
job_id = uuid.uuid4()
team = await db.get(Team, team_id)
if not team or not team.session_key_encrypted:
raise HTTPException(400, "Team missing Virtuozzo credentials")
job_payload = {
"job_id": str(job_id),
"team_id": team_id,
"user_id": user_id,
"session_key_encrypted": team.session_key_encrypted,
"specific_env_name": specific_env_name,
"priority": 100 if specific_env_name else 50, # Higher priority for targeted sync
}
# Publish to RabbitMQ
await rabbitmq_client.publish(
queue=JobQueues.ENVIRONMENTS_SYNC,
message=job_payload,
priority=job_payload["priority"]
)
logger.info(
"sync_job_enqueued",
job_id=str(job_id),
team_id=team_id,
specific_env=specific_env_name
)
return {
"job_id": str(job_id),
"status": "queued",
"message": "Environment sync queued"
}
Worker Consuming Jobs:
# backend/app/modules/vz_sync/worker.py
async def consume_sync_jobs():
"""
RabbitMQ consumer for environment sync jobs.
Processes jobs from jobs.environments.sync queue.
"""
async with rabbitmq_client.channel() as channel:
await channel.set_qos(prefetch_count=1)
queue = await channel.declare_queue(
JobQueues.ENVIRONMENTS_SYNC,
durable=True,
arguments={"x-max-priority": 255}
)
async for message in queue:
async with message.process():
job_payload = json.loads(message.body)
try:
await execute_sync_job(job_payload)
except Exception as e:
logger.error(
"sync_job_failed",
job_id=job_payload["job_id"],
error=str(e),
exc_info=True
)
# Publish failure event
await publish_sync_status_event(
team_id=job_payload["team_id"],
job_id=job_payload["job_id"],
status="failed",
error=str(e)
)
Publishing Status Events:
async def publish_sync_status_event(
team_id: int,
job_id: str,
status: str,
error: str | None = None,
env_count: int | None = None
) -> None:
"""
Publish sync status to events exchange for SSE notifications.
Frontend subscribes to: /api/v1/events?team_id={team_id}
"""
event_payload = {
"event_type": "sync.status",
"job_id": job_id,
"team_id": team_id,
"status": status,
"timestamp": datetime.now(timezone.utc).isoformat(),
}
if error:
event_payload["error"] = error
if env_count is not None:
event_payload["environment_count"] = env_count
routing_key = JobEvents.sync_status(team_id)
await rabbitmq_client.publish_event(
exchange="events.topic",
routing_key=routing_key,
message=event_payload
)
logger.info(
"sync_status_event_published",
job_id=job_id,
team_id=team_id,
status=status,
routing_key=routing_key
)
Data Validation Patterns¶
Environment Data Validation:
# backend/app/domains/environments/validators.py
class EnvironmentDataValidator:
"""Validate and sanitize Virtuozzo API data before DB insert."""
@staticmethod
def sanitize_environment_data(data: dict) -> dict:
"""
Ensure all fields are correct types and handle missing values.
Key Validations:
1. Required fields have fallback values
2. Numeric fields are actually numeric
3. JSON fields are properly encoded
4. Null parent_env_id is removed to preserve existing relationships
"""
# Required fields with fallbacks
data.setdefault("displayName", "Unnamed Environment")
data.setdefault("envName", f"env-{uuid.uuid4().hex[:8]}")
data.setdefault("status", "unknown")
# Numeric field validation
numeric_fields = ["uid", "creatorUid", "ownerUid"]
for field in numeric_fields:
if field in data and not isinstance(data[field], (int, type(None))):
# Try to convert to int
try:
data[field] = int(data[field])
except (ValueError, TypeError):
logger.warning(
f"invalid_numeric_field",
field=field,
value=data[field]
)
data[field] = None
# appid can be string or int - handle both
if "appid" in data and not isinstance(data["appid"], (int, str, type(None))):
data["appid"] = str(data["appid"])
# JSON array fields - ensure valid or remove
json_fields = ["extdomains", "envGroups"]
for field in json_fields:
if field in data:
if data[field] is None:
# Remove null to preserve existing DB value
del data[field]
elif isinstance(data[field], str):
# Validate JSON string
try:
json.loads(data[field])
except json.JSONDecodeError:
logger.warning(f"invalid_json_field", field=field)
del data[field]
elif not isinstance(data[field], list):
logger.warning(f"invalid_array_field", field=field)
del data[field]
# CRITICAL: Remove null parent_env_id to preserve existing relationships
if "parent_env_id" in data and data["parent_env_id"] is None:
del data["parent_env_id"]
return data
@staticmethod
def validate_team_ownership(
existing_env: Environment | None,
sync_team_id: int,
user_id: int
) -> int:
"""
Validate and return correct team_id for environment.
Rules:
1. Existing env: Preserve original team_id (prevent cross-team overwrite)
2. New env: Only allow if user owns the sync team
3. Returns: The correct team_id to use
"""
if existing_env:
if existing_env.team_id != sync_team_id:
logger.info(
"preserving_original_team_id",
env_name=existing_env.envName,
original_team_id=existing_env.team_id,
sync_team_id=sync_team_id
)
return existing_env.team_id
# New environment - verify user owns team
team = Team.query.get(sync_team_id)
if not team or team.user_id != user_id:
raise ValidationError(
f"User {user_id} cannot create environments for team {sync_team_id}"
)
return sync_team_id
@staticmethod
def link_staging_parent(
env_name: str,
env_groups: list[str],
team_id: int
) -> int | None:
"""
For staging environments, find and link to production parent.
Pattern: "staging7-myapp" -> parent is "myapp"
"""
if "staging" not in env_groups:
return None
# Extract parent name (e.g., staging7-myapp -> myapp)
parent_name = re.sub(r'^staging\d*-', '', env_name)
if parent_name == env_name:
return None # Couldn't extract parent name
parent_env = Environment.query.filter_by(
team_id=team_id,
envName=parent_name
).first()
if parent_env:
logger.info(
"staging_parent_linked",
staging_env=env_name,
parent_env=parent_name,
parent_id=parent_env.id
)
return parent_env.id
logger.warning(
"staging_parent_not_found",
staging_env=env_name,
expected_parent=parent_name
)
return None
Node Data Validation:
class NodeDataValidator:
"""Validate node data before upsert."""
@staticmethod
def sanitize_node_data(data: dict) -> dict:
"""Ensure node fields are valid types."""
# node_id MUST be integer
if "node_id" in data and not isinstance(data["node_id"], int):
try:
# Try to extract numeric part from mock IDs
if isinstance(data["node_id"], str):
match = re.search(r'(\d+)$', data["node_id"])
if match:
data["node_id"] = int(match.group(1))
else:
raise ValueError("No numeric part found")
except (ValueError, AttributeError):
# Generate fallback ID
data["node_id"] = random.randint(100000, 999999)
logger.warning(
"generated_fallback_node_id",
original=data.get("node_id"),
fallback=data["node_id"]
)
# Cloudlet fields must be integers
cloudlet_fields = ["fixedCloudlets", "flexibleCloudlets"]
for field in cloudlet_fields:
if field in data and not isinstance(data[field], (int, type(None))):
try:
data[field] = int(data[field])
except (ValueError, TypeError):
logger.warning(f"invalid_cloudlet_value", field=field)
data[field] = None
# JSON fields
json_fields = ["extIPs", "addons", "customitem"]
for field in json_fields:
if field in data and not isinstance(data[field], (str, type(None))):
data[field] = json.dumps(data[field])
return data
Sync Worker Flow Summary¶
async def execute_sync_job(job_payload: dict) -> None:
"""
Main sync worker logic (simplified).
Steps:
1. Validate team membership
2. Invalidate caches
3. Decrypt session key (with self-healing)
4. Fetch from Virtuozzo API (with cache-aside)
5. Validate & map data
6. Upsert environments and nodes
7. Cleanup orphaned records
8. Publish completion event
"""
team_id = job_payload["team_id"]
user_id = job_payload["user_id"]
# 1. Validate team membership
if not await team_auth_service.is_member(user_id, team_id):
await complete_without_retry("Not team member")
return
# 2. Invalidate caches
await invalidate_caches(team_id, user_id)
# 3. Decrypt session key
try:
session_key = await decrypt_with_self_healing(
job_payload["session_key_encrypted"],
team_id,
user_id
)
except CryptoError as e:
await fail_job(str(e))
return
# 4. Fetch from API
try:
response = await virtuozzo_service.fetch_environments_and_nodes(
session_key=session_key
)
except VirtuozzoAPIError as e:
await fail_job_with_retry(str(e))
return
# 5-7. Process data
infos = response.get("data", {}).get("infos", [])
await sync_environments_and_nodes(team_id, user_id, infos)
# 8. Publish success event
await publish_sync_status_event(
team_id=team_id,
job_id=job_payload["job_id"],
status="completed",
env_count=len(infos)
)
Freshness Metadata¶
Environment responses should include sync status:
def _serialize_environment(env: Environment) -> dict[str, Any]:
return {
"name": env.name,
"display_name": env.display_name,
"status": env.status,
# ... other fields
"sync_status": {
"freshness": _calculate_freshness(env.synced_at),
"last_synced_at": env.synced_at.isoformat() if env.synced_at else None,
"is_stale": env.stale or False,
}
}
def _calculate_freshness(synced_at: datetime | None) -> str:
if not synced_at:
return "unknown"
age = datetime.now(timezone.utc) - synced_at
if age.total_seconds() < 600: # < 10 minutes
return "current"
elif age.total_seconds() < 3600: # < 1 hour
return "recent"
else:
return "stale"
Triggering Sync on Demand¶
Members (with appropriate permissions) can trigger manual sync:
# POST /api/v1/teams/{team_id}/vz-sync
@router.post("/teams/{team_id}/vz-sync")
async def trigger_sync(
team_id: int,
current_user: Annotated[
AuthenticatedUser,
Depends(require_permission("team.manage"))
],
) -> dict[str, str]:
if current_user.team_id != team_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Cannot manage different team.",
)
result = await trigger_environment_sync(
team_id=team_id,
user_id=current_user.user_id
)
return result # {"job_id": "...", "status": "queued"}
Key Sync Worker Patterns (From Laravel Implementation)¶
The sync worker should implement these essential patterns:
1. Session Key Self-Healing - Try decryption with multiple strategies - Check if already in API format (167x...) - Check if already raw/unencrypted - Attempt Fernet decryption - Fallback: Self-heal from User or Environment models
2. Cache-Aside Strategy
- Cache key: sync_data_{team_id} (10 minute TTL)
- If no DB environments exist → force API call
- Otherwise check cache first, call API if miss
- Log API response metadata for debugging
3. Team Ownership Preservation
- CRITICAL: Existing environments MUST preserve their original team_id
- Only set team_id for NEW environments
- Verify user owns team before creating new environments
- Prevents cross-team data leakage during sync
4. Staging Environment Linking
- Extract parent name from staging pattern: staging7-myapp → myapp
- Link parent_env_id to production environment
- Preserve existing parent_env_id if API doesn't provide it
5. Orphaned Resource Cleanup
- Only cleanup when doing FULL sync (not specific environment)
- Only cleanup environments belonging to current team_id
- Skip cleanup if environment has child staging environments
- Delete in order: SFTP users → Backups → Nodes → Environment
6. Cache Invalidation
- Invalidate BEFORE sync starts
- Keys to clear:
- sync_data_{team_id}
- team_{team_id}_pending_sites_exist
- team_{team_id}_site_creation_status_response
- sync_status_{user_id}
- DO NOT clear: sync_in_progress_{user_id} (only on completion)
7. Rate Limiting - Max 1 sync per team every 60 seconds - Use Redis SET with NX and EX for distributed locking - Backend processes can bypass via flag
8. Memory Management - Check memory usage periodically (default limit: 512 MB) - Fail gracefully if exceeded - Especially important for teams with 100+ environments
Freshness Metadata¶
Environment responses should include sync freshness:
def _serialize_environment(env: Environment) -> dict[str, Any]:
return {
"name": env.name,
"display_name": env.display_name,
"status": env.status,
# ... other fields
"sync_status": {
"freshness": _calculate_freshness(env.synced_at),
"last_synced_at": env.synced_at.isoformat() if env.synced_at else None,
"is_stale": env.stale or False,
}
}
def _calculate_freshness(synced_at: datetime | None) -> str:
if not synced_at:
return "unknown"
age = datetime.now(timezone.utc) - synced_at
if age.total_seconds() < 600: # < 10 minutes
return "current"
elif age.total_seconds() < 3600: # < 1 hour
return "recent"
else:
return "stale"
Triggering Sync on Demand¶
Members (with appropriate permissions) can trigger manual sync:
# POST /api/v1/teams/{team_id}/vz-sync
@router.post("/teams/{team_id}/vz-sync")
async def trigger_sync(
team_id: int,
current_user: Annotated[
AuthenticatedUser,
Depends(require_permission("team.manage"))
],
) -> dict[str, str]:
if current_user.team_id != team_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Cannot manage different team.",
)
await sync_scheduler.force_sync(
team_id=team_id,
scope="full",
reason="manual_api_request"
)
return {"status": "ok", "message": "Sync initiated"}
👥 Owner vs Member View Implementation¶
Unified View Model¶
Both owners and members see the same cached environment data. There is no separate "owner view" vs "member view" at the data level.
Differentiation happens at the permission level:
// frontend/src/features/environments/components/EnvironmentList.tsx
export function EnvironmentList({
environments,
currentUser
}: EnvironmentListProps) {
const canCreate = currentUser.permissions.includes('server.create');
const canDelete = currentUser.permissions.includes('server.delete');
const canRestart = currentUser.permissions.includes('server.restart');
return (
<div>
{canCreate && (
<Button onClick={handleCreate}>Create Environment</Button>
)}
{environments.map(env => (
<EnvironmentCard
key={env.name}
environment={env}
actions={{
restart: canRestart,
delete: canDelete,
}}
/>
))}
</div>
);
}
Permission-Based UI Elements¶
| UI Element | Visible To | Condition |
|---|---|---|
| "Create Environment" button | Members with server.create |
Default: Owner, Manager, Developer |
| "Delete" button | Members with server.delete |
Default: Owner, Manager only |
| "Restart" button | Members with server.restart |
Default: Owner, Manager, Developer |
| "Manage Billing" link | Members with billing.edit |
Default: Owner, Manager only |
| "Sync Now" button | Members with team.manage |
Default: Owner only |
Frontend Implementation Pattern¶
// frontend/src/features/environments/hooks/useEnvironmentPermissions.ts
export function useEnvironmentPermissions() {
const { user } = useAuth();
return {
canListEnvironments: true, // All team members
canViewDetails: true,
canCreate: user.permissions.includes('server.create'),
canRestart: user.permissions.includes('server.restart'),
canDelete: user.permissions.includes('server.delete'),
canManageSync: user.permissions.includes('team.manage'),
// Derived permissions
hasFullAccess: user.role_name === 'Owner',
isReadOnly: !user.permissions.some(p =>
p.startsWith('server.') && p !== 'server.view'
),
};
}
🔨 Implementation Checklist¶
Backend Tasks¶
- Repository Layer (
backend/app/domains/environments/repository.py) -
list_team_environments(team_id)with sync metadata -
get_environment_by_name(team_id, env_name) -
shortdomain_exists(team_id, shortdomain) -
has_active_job_for_shortdomain(team_id, shortdomain) -
mark_environment_stale(team_id, env_name) -
Service Layer (
backend/app/domains/environments/service.py) -
list_environments()- Read from cache with freshness -
get_environment_details()- Single environment view -
enqueue_environment_creation()- Job dispatch -
Include sync status in all responses
-
Router (
backend/app/api/v1/environments.py) -
GET /environments- List (team membership required) -
GET /environments/{env_name}- Details (team membership) -
POST /environments- Create (server.createpermission) -
DELETE /environments/{env_name}- Delete (server.deletepermission) -
POST /environments/{env_name}/restart- Restart (server.restartpermission) -
Job Worker (
backend/app/domains/environments/worker.py) - Decrypt owner session key
- Call Virtuozzo API with owner credentials
- Poll for environment status
- Publish SSE events for job progress
- Insert environment record on success
-
Log failures with structured logging
-
Sync Integration
- Extend
vz_sync_statesto track environment sync - Worker updates
synced_atandsource_versionon each sync - Manual sync endpoint with
team.managepermission
Frontend Tasks¶
- Type Definitions (
frontend/src/features/environments/types/environment.types.ts) -
Environmentinterface with sync metadata -
EnvironmentCreateRequestschema -
EnvironmentJobStatusenum -
SyncStatusinterface -
API Client (
frontend/src/features/environments/services/environmentService.ts) -
getEnvironments() -
getEnvironmentDetails(envName) -
createEnvironment(payload) -
deleteEnvironment(envName) -
restartEnvironment(envName) -
triggerSync() -
Components
-
EnvironmentList- Table with sync badges -
EnvironmentCard- Individual environment display -
CreateEnvironmentDialog- Form with validation -
EnvironmentSyncBadge- Freshness indicator -
JobStatusTracker- SSE-based progress display -
Permission Guards
-
useEnvironmentPermissions()hook - Conditional rendering based on permissions
- Disable actions for insufficient permissions
🔍 Testing Strategy¶
Backend Tests¶
- Unit Tests (
backend/tests/unit/domains/environments/) test_list_environments_team_scoping()test_create_environment_permission_check()test_create_environment_validates_vz_credentials()-
test_sync_status_calculation() -
Integration Tests (
backend/tests/integration/domains/environments/) test_create_environment_job_flow_e2e()test_member_access_via_team_membership()-
test_sync_updates_environment_freshness() -
Permission Tests
test_member_cannot_delete_without_permission()test_owner_has_wildcard_access()test_custom_role_with_server_create()
Frontend Tests¶
- Component Tests
- Permission-based rendering
- Create dialog validation
-
SSE job status updates
-
Integration Tests
- E2E environment creation flow
- Permission checks across member types
- Sync trigger and status update
📚 Reference Documentation¶
- RBAC System:
docs/RBAC_IMPLEMENTATION_SUMMARY.md - Team Management:
docs/TEAM_MEMBERS_MANAGEMENT_GUIDE.md - Sync Architecture:
docs/architecture/CACHING/environments-sync.md - Auth System:
docs/architecture/AUTH-SYSTEM/team_management_auth_rbac.md - Job Pattern: Referenced in sync architecture and SSE notification docs
🚨 Common Pitfalls¶
1. Direct Virtuozzo Calls from Frontend¶
❌ Wrong:
// Frontend code
const response = await fetch('https://virtuozzo.api.com/environments', {
headers: { 'X-Session': ownerSessionKey }
});
✅ Correct:
// Frontend code
const environments = await apiClient.get('/api/v1/environments');
// Backend proxies the call with owner credentials
2. Missing Team Context Validation¶
❌ Wrong:
async def list_environments(current_user: AuthenticatedUser):
# Directly querying without team check
return await db.execute(select(Environment))
✅ Correct:
async def list_environments(current_user: AuthenticatedUser):
if current_user.team_id is None:
raise HTTPException(403, "Missing team context")
return await repository.list_team_environments(
team_id=current_user.team_id
)
3. Forgetting Permission Checks¶
❌ Wrong:
@router.delete("/environments/{env_name}")
async def delete_environment(
env_name: str,
current_user: AuthenticatedUser = Depends(get_current_user)
):
# Anyone can delete!
...
✅ Correct:
@router.delete("/environments/{env_name}")
async def delete_environment(
env_name: str,
current_user: AuthenticatedUser = Depends(
require_permission("server.delete")
)
):
...
4. Not Including Sync Metadata¶
❌ Wrong:
✅ Correct:
return {
"items": [{
"name": env.name,
"status": env.status,
"sync_status": {
"freshness": "current",
"last_synced_at": env.synced_at
}
}]
}
💡 Best Practices¶
- Always validate team context before querying environments
- Use require_permission() dependency for all mutating operations
- Include sync metadata in all environment responses
- Log security violations when permission checks fail
- Use job system for any operation that calls Virtuozzo (>2s response time)
- Decrypt credentials in service layer, never pass encrypted keys to workers
- Publish SSE events for async job status updates
- Test with multiple roles (Owner, Manager, Developer) during development
- Document permission requirements in API docstrings
- Follow existing patterns from team management and RBAC implementations
📞 Support & Questions¶
For implementation questions or clarifications:
- Review existing implementations in backend/app/domains/environments/
- Check RBAC patterns in backend/app/modules/rbac/
- Reference team management implementation in backend/app/modules/team_management/
- Consult sync architecture doc for job system patterns