Skip to content

Environment API Implementation Guide

🎯 Overview

This document provides comprehensive guidance on implementing environment-related APIs in the MBPanel system, covering: - How environment data is accessed from owner accounts - How invited team members view environments - Permission-based access control for environments - Job system integration for async operations - Sync architecture and staleness management

📐 Architecture Context

Team-Centric Model

MBPanel uses a Team-Centric Architecture where: - Owner: The user who created the team and owns the Virtuozzo account - Members: Invited users who access resources through team membership - Environments: Infrastructure resources owned by the team (via Virtuozzo)

graph TD
    Owner[Owner User] -->|Creates| Team[Team Entity]
    Team -->|Stores| VZCreds[VZ Credentials Encrypted]
    VZCreds -->|Authenticates| Virtuozzo[Virtuozzo Infrastructure]
    Virtuozzo -->|Manages| Envs[Environments]

    Member1[Member User] -->|Joins| Team
    Member2[Member User] -->|Joins| Team

    Member1 -->|Reads via| Cache[Environment Cache]
    Member2 -->|Reads via| Cache
    Owner -->|Reads via| Cache

    Cache -->|Syncs from| Virtuozzo

    Team -->|Controls Access| RBAC[RBAC System]
    RBAC -->|Enforces| Permissions[Permissions]

Key Principles

  1. Single Source of Truth: Virtuozzo is the authoritative source for environment data
  2. Backend Proxy Pattern: Frontend never calls Virtuozzo directly; backend acts as secure proxy
  3. Cache-Aside with Sync: Postgres caches Virtuozzo data with explicit freshness tracking
  4. Permission-Based Access: All operations are gated by role-based permissions
  5. Job-Based Async: Long-running operations (create, modify) use job system

🔐 Access Control & Permissions

Owner Account Access

The owner is the team member who: - Registered with Virtuozzo credentials OR created a new Virtuozzo account - Has their Virtuozzo session key stored encrypted in teams.session_key_encrypted - Has their Virtuozzo UID stored in teams.vz_uid - Automatically gets the "Owner" role with * (wildcard) permission

Critical Implementation Notes: - Owner credentials are encrypted at rest using Fernet encryption (see app/core/crypto.py) - Session keys are refreshed and re-encrypted on owner login - Only the owner's session key is used for Virtuozzo API calls (per Virtuozzo multi-user model)

Member Access

Invited members access environments through: 1. Team Membership: Link in team_members table 2. Role Assignment: Each member has a role (Owner, Manager, Developer, or custom) 3. Permission Inheritance: Permissions are derived from their role

Key Difference from Owner: - Members do NOT have personal Virtuozzo credentials - Members read from the same cached environment data as the owner - Members' actions are proxied through the backend using the owner's Virtuozzo session

Permission Matrix

Operation Required Permission Owner Manager Developer
List environments (team membership)
View environment details (team membership)
Create environment server.create
Restart environment server.restart
Delete environment server.delete
Modify team billing billing.edit
Invite members team.invite
Manage roles team.manage

Note: The permission matrix can be customized per team using the RBAC system. The above reflects the default role permissions seeded during team creation.


🛠️ API Endpoint Implementation

Virtuozzo API Integration

Before implementing our backend endpoints, understand the underlying Virtuozzo API:

Virtuozzo GetEnvs Endpoint:

GET /environment/control/rest/getenvs

Parameters: - session (required): User session key or personal access token - lazy (optional): true = load only basic metadata, false = full environment info - appid (optional): Unique application identifier (for authentication) - ownerUid (optional): Target user account UID

Example Call:

# backend/app/infrastructure/external/virtuozzo/service.py
async def fetch_environments_and_nodes(
    session_key: str,
    lazy: bool = False,
    owner_uid: int | None = None
) -> dict:
    """
    Call Virtuozzo GetEnvs API.

    Returns nested structure with environments, nodes, and metadata.
    """
    params = {"session": session_key}
    if lazy:
        params["lazy"] = "true"
    if owner_uid:
        params["ownerUid"] = owner_uid

    async with create_external_async_client() as client:
        response = await client.get(
            f"{settings.virtuozzo_base_url}/environment/control/rest/getenvs",
            params=params,
            timeout=30.0
        )
        response.raise_for_status()
        return response.json()

Virtuozzo Response Structure:

{
  "result": 0,
  "error": null,
  "infos": [
    {
      "env": {
        "envName": "myapp-prod",
        "displayName": "Production App",
        "shortdomain": "myapp",
        "status": 1,
        "uid": 12345,
        "creatorUid": 12345,
        "appid": "wp-cluster",
        "domain": "myapp.jelastic.cloud",
        "extDomains": [{"extdomains": "www.example.com"}],
        "isFirewallEnabled": true,
        "createdOn": "2026-01-10 14:30:00",
        // ... many more fields
      },
      "envGroups": ["production"],
      "nodes": [
        {
          "id": 98765,
          "name": "cp-node",
          "nodeGroup": "cp",
          "status": 1,
          "fixedCloudlets": 4,
          "flexibleCloudlets": 8,
          "address": "192.168.1.10",
          "intIP": "10.0.0.5",
          "extIps": [{"extIPs": "203.0.113.25"}],
          // ... many more fields
        }
      ]
    }
  ]
}

Response Field Mapping:

Virtuozzo Field Type Maps To Notes
result int - 0 = success, non-zero = error
error string - Error message if result != 0
infos array Environments list Array of environment objects
infos[].env object Environment model Core environment data
infos[].envGroups array Environment.envGroups Tags like ["staging"], ["production"]
infos[].nodes array Node model Array of node objects

1. List Environments

Endpoint: GET /api/v1/environments

Purpose: Retrieve all environments for the current team from cached data.

Implementation:

# backend/app/api/v1/environments.py
@router.get("/", summary="List environments for the current team")
async def list_environments(
    db: Annotated[AsyncSession, Depends(db_session)],
    current_user: Annotated[AuthenticatedUser, Depends(get_current_user)],
) -> dict[str, list]:
    items = await service.list_environments(db=db, current_user=current_user)
    return {"items": items}

Service Layer:

# backend/app/domains/environments/service.py
async def list_environments(
    *,
    db: AsyncSession,
    current_user: AuthenticatedUser
) -> list[dict[str, Any]]:
    """
    List all environments for the current user's team.

    Data Flow:
    1. Validate team context exists
    2. Query cached environments from Postgres (NOT Virtuozzo API)
    3. Serialize with sync metadata
    4. Return to frontend

    Note: This endpoint reads from cache. To refresh data, trigger a sync job.
    """
    if current_user.team_id is None:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Missing team context."
        )

    rows = await repository.list_team_environments(
        db=db,
        team_id=int(current_user.team_id)
    )
    return [_serialize_environment(env) for env in rows]

Key Implementation Points:

  1. Team Context Validation: Always verify current_user.team_id is present
  2. No Direct VZ Calls: Read from Postgres cache (environments table), NOT Virtuozzo API
  3. Automatic Team Scoping: Repository filters by team_id automatically
  4. Freshness Metadata: Include sync status in response (see Sync section below)
  5. Sync vs List:
  6. List endpoint = Fast read from cache
  7. Sync job = Calls Virtuozzo GetEnvs API and updates cache

Response Schema:

{
  "items": [
    {
      "name": "myapp-prod",
      "display_name": "Production App",
      "status": "RUNNING",
      "created_at": "2026-01-10T14:30:00Z",
      "cloudlets_baseline": 4,
      "cloudlets_max": 8,
      "sync_status": {
        "freshness": "current",
        "last_synced_at": "2026-01-15T10:00:00Z"
      }
    }
  ]
}

2. Create Environment (Async Job)

Endpoint: POST /api/v1/environments

Purpose: Provision a new environment (asynchronous operation using job system).

Implementation:

# backend/app/api/v1/environments.py
@router.post(
    "/",
    response_model=EnvironmentCreateResponse,
    status_code=status.HTTP_202_ACCEPTED,
    summary="Provision a new environment (async job)",
)
async def create_environment(
    request: EnvironmentCreateRequest,
    db: Annotated[AsyncSession, Depends(db_session)],
    current_user: Annotated[
        AuthenticatedUser,
        Depends(require_permission("server.create"))
    ],
) -> EnvironmentCreateResponse:
    return await service.enqueue_environment_creation(
        db=db,
        current_user=current_user,
        payload=request
    )

Permission Enforcement: - Uses require_permission("server.create") dependency - Automatically checks if user's role includes this permission - Returns 403 if permission is missing

Service Implementation:

async def enqueue_environment_creation(
    *,
    db: AsyncSession,
    current_user: AuthenticatedUser,
    payload: EnvironmentCreateRequest,
) -> EnvironmentCreateResponse:
    # 1. Validate team context
    if current_user.team_id is None:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Missing team context."
        )

    # 2. Load team and validate state
    team = await db.get(Team, int(current_user.team_id))
    if not team:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="Team not found."
        )

    # 3. Validate team has required Virtuozzo credentials
    if not team.session_key_encrypted:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="Team missing Virtuozzo session key. Complete onboarding first.",
        )
    if not team.vz_uid:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="Team missing Virtuozzo UID. Complete onboarding first.",
        )

    # 4. Validate team subscription status
    if team.status not in {"TRIALING", "ACTIVE"}:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Team subscription is not active.",
        )

    # 5. Check for naming conflicts
    shortdomain = payload.shortdomain.lower()
    if await repository.shortdomain_exists(
        db=db,
        team_id=team.id,
        shortdomain=shortdomain
    ):
        raise HTTPException(
            status_code=status.HTTP_409_CONFLICT,
            detail="Shortdomain already exists."
        )

    # 6. Check for in-flight jobs for same shortdomain
    if await repository.has_active_job_for_shortdomain(
        db=db,
        team_id=team.id,
        shortdomain=shortdomain
    ):
        raise HTTPException(
            status_code=status.HTTP_409_CONFLICT,
            detail="Provisioning already in progress for this name.",
        )

    # 7. Create job record and enqueue
    job_id = uuid.uuid4()
    job_payload = CreateEnvironmentJobPayload(
        job_id=job_id,
        team_id=team.id,
        user_id=current_user.user_id,
        display_name=payload.display_name,
        shortdomain=shortdomain,
        owner_uid=team.vz_uid,
        session_key_encrypted=team.session_key_encrypted,
        env=_build_env_spec(payload),
        nodes=_build_nodes_spec(payload),
    )

    # 8. Record job log
    await _insert_job_log(db, job_id, team.id, current_user.user_id, ...)

    # 9. Publish status event (SSE notification)
    await _publish_job_status_event(
        team_id=team.id,
        job_id=job_id,
        status=EnvironmentJobStatus.QUEUED,
        message="Provisioning request queued.",
        env_name=shortdomain,
    )

    # 10. Dispatch to job queue
    _dispatch_job(job_payload)

    return EnvironmentCreateResponse(
        job_id=job_id,
        status=EnvironmentJobStatus.QUEUED
    )

Response (202 Accepted):

{
  "job_id": "550e8400-e29b-41d4-a716-446655440000",
  "status": "queued"
}

Job Status Tracking:

Clients can track job progress via: 1. SSE Events: Subscribe to /api/v1/events?job_id={job_id} 2. Polling: Query job status (future endpoint)

Example SSE event stream:

event: job.status
data: {"job_id": "...", "status": "queued", "message": "Job queued"}

event: job.status
data: {"job_id": "...", "status": "started", "message": "Calling Virtuozzo API"}

event: job.status
data: {"job_id": "...", "status": "polling", "message": "Waiting for environment activation"}

event: job.status
data: {"job_id": "...", "status": "completed", "env_name": "myapp-prod", "app_id": "123"}

🔄 Data Mapping & Sync Patterns

Virtuozzo API Response Structure

When calling the Virtuozzo API, you receive a nested structure:

{
  "data": {
    "infos": [
      {
        "env": {
          "displayName": "Production App",
          "envName": "myapp-prod",
          "shortdomain": "myapp",
          "status": "RUNNING",
          "uid": 12345,
          "creatorUid": 12345,
          "ownerUid": 12345,
          "appid": "wp-cluster-123",
          "domain": "myapp.jelastic.cloud",
          "custdomain": "myapp.example.com",
          "extdomains": ["www.example.com", "example.com"],
          "createdOn": "2026-01-10 14:30:00",
          "isFirewallEnabled": true,
          "hostGroup": {
            "uniqueName": "vz-eu-west-1",
            "displayName": "EU West 1"
          },
          "contexts": [
            {
              "archivename": "myapp-backup-20260110"
            }
          ]
        },
        "envGroups": ["production"],
        "nodes": [
          {
            "id": 98765,
            "name": "cp-node",
            "nodeGroup": "cp",
            "status": "RUNNING",
            "fixedCloudlets": 4,
            "flexibleCloudlets": 8,
            "diskLimit": 10240,
            "bandwidthLimit": 1024,
            // ... many more node fields
          }
        ]
      }
    ]
  }
}

Environment Model Mapping

Map the Virtuozzo API response to your Environment model:

Database Column Source Notes
team_id Job context CRITICAL: Preserve existing team_id for existing records
envName env.envName Unique key - use for upsert matching
displayName env.displayName User-friendly name
shortdomain env.shortdomain Subdomain identifier
status env.status RUNNING, STOPPED, etc.
uid env.uid Virtuozzo user ID
creatorUid env.creatorUid Who created the environment
ownerUid env.ownerUid Current owner UID
appid env.appid Application template ID (may be string or int)
domain env.domain Default Jelastic domain
custdomain env.custdomain Custom domain if set
extdomains env.extdomains JSON array - external domains
createdOn env.createdOn ISO timestamp
isFirewallEnabled env.isFirewallEnabled Boolean
hostGroup_uniqueName env.hostGroup.uniqueName Region identifier
hostGroup_displayName env.hostGroup.displayName Region display name
archivename env.contexts[0].archivename Backup archive name (optional)
envGroups envGroups array JSON array - ["staging"], ["production"]
parent_env_id Derived For staging envs, link to production parent
session_key_encrypted Job context Owner's encrypted VZ session
synced_at Current timestamp Track sync freshness
source_version Hash of API response Detect changes

Key Mapping Logic:

def map_environment_from_api(
    env_data: dict,
    env_groups: list[str],
    team_id: int,
    session_key_encrypted: str
) -> dict:
    """Map Virtuozzo API data to Environment model."""
    return {
        "team_id": team_id,
        "session_key_encrypted": session_key_encrypted,
        "envName": env_data.get("envName"),
        "displayName": env_data.get("displayName"),
        "shortdomain": env_data.get("shortdomain"),
        "status": env_data.get("status"),
        "uid": env_data.get("uid"),
        "creatorUid": env_data.get("creatorUid"),
        "ownerUid": env_data.get("ownerUid"),
        "appid": env_data.get("appid"),
        "domain": env_data.get("domain"),
        "custdomain": env_data.get("custdomain"),
        "extdomains": json.dumps(env_data.get("extdomains", [])),
        "createdOn": env_data.get("createdOn"),
        "isFirewallEnabled": env_data.get("isFirewallEnabled"),
        "hostGroup_uniqueName": env_data.get("hostGroup", {}).get("uniqueName"),
        "hostGroup_displayName": env_data.get("hostGroup", {}).get("displayName"),
        "archivename": env_data.get("contexts", [{}])[0].get("archivename"),
        "envGroups": json.dumps(env_groups) if env_groups else None,
        "synced_at": datetime.now(timezone.utc),
        # parent_env_id computed separately for staging environments
    }

Node Model Mapping

Map node data to your Node model:

Database Column Source Notes
environment_id Computed FK to environments.id
node_id nodes[].id Unique key - Virtuozzo node ID
name nodes[].name Node identifier (e.g., "cp-node")
nodeGroup nodes[].nodeGroup Node type: cp, bl, sqldb
status nodes[].status RUNNING, STOPPED
fixedCloudlets nodes[].fixedCloudlets Baseline resources (1 cloudlet = 128MB RAM)
flexibleCloudlets nodes[].flexibleCloudlets Scaling ceiling
diskLimit nodes[].diskLimit MB
diskIoLimit nodes[].diskIoLimit IOPS limit
bandwidthLimit nodes[].bandwidthLimit Mbps
engineType nodes[].engineType nginx-php-8.1, mysql-8.0
version nodes[].version Engine version
url nodes[].url Node access URL
adminUrl nodes[].adminUrl Admin panel URL
address nodes[].address Public IP
intIP nodes[].intIP Internal IP
extIPs nodes[].extIPs JSON array - External IPs
addons nodes[].addons JSON array - Installed addons
customitem nodes[].customitem JSON object - Custom metadata

Key Mapping Logic:

def map_node_from_api(node_data: dict, environment_id: int) -> dict:
    """Map Virtuozzo node data to Node model."""
    return {
        "environment_id": environment_id,
        "node_id": node_data["id"],
        "name": node_data.get("name"),
        "nodeGroup": node_data.get("nodeGroup"),
        "status": node_data.get("status"),
        "fixedCloudlets": node_data.get("fixedCloudlets"),
        "flexibleCloudlets": node_data.get("flexibleCloudlets"),
        "diskLimit": node_data.get("diskLimit"),
        "diskIoLimit": node_data.get("diskIoLimit"),
        "bandwidthLimit": node_data.get("bandwidthLimit"),
        "engineType": node_data.get("engineType"),
        "version": node_data.get("version"),
        "url": node_data.get("url"),
        "adminUrl": node_data.get("adminUrl"),
        "address": node_data.get("address"),
        "intIP": node_data.get("intIP"),
        "extIPs": json.dumps(node_data.get("extIPs", [])),
        "addons": json.dumps(node_data.get("addons", [])),
        "customitem": json.dumps(node_data.get("customitem", {})),
    Virtuozzo Service Implementation

Centralize all Virtuozzo API calls in the infrastructure layer:

```python
# backend/app/infrastructure/external/virtuozzo/service.py
import httpx
from app.core.config import settings
from app.core.logging import get_logger

logger = get_logger(__name__)


class VirtuozzoService:
    """Client for Virtuozzo PaaS API."""

    def __init__(self):
        self.base_url = settings.virtuozzo_base_url

    async def fetch_environments_and_nodes(
        self,
        session_key: str,
        lazy: bool = False,
        owner_uid: int | None = None
    ) -> dict:
        """
        Fetch all environments and nodes for a user.

        Calls: GET /environment/control/rest/getenvs

        Args:
            session_key: Decrypted Virtuozzo session key
            lazy: If True, loads only basic metadata
            owner_uid: Optional target user UID

        Returns:
            {
                "result": 0,
                "error": null,
                "infos": [
                    {
                        "env": {...},
                        "envGroups": [...],
                        "nodes": [...]
                    }
                ]
            }

        Raises:
            httpx.HTTPStatusError: If Virtuozzo API returns non-2xx
            httpx.TimeoutException: If request times out
        """
        params = {"session": session_key}
        if lazy:
            params["lazy"] = "true"
        if owner_uid:
            params["ownerUid"] = owner_uid

        logger.info(
            "calling_virtuozzo_getenvs",
            lazy=lazy,
            has_owner_uid=bool(owner_uid)
        )

        async with httpx.AsyncClient(timeout=30.0) as client:
            response = await client.get(
                f"{self.base_url}/environment/control/rest/getenvs",
                params=params
            )
            response.raise_for_status()
            data = response.json()

        # Check Virtuozzo-level errors
        if data.get("result") != 0:
            error_msg = data.get("error", "Unknown Virtuozzo error")
            logger.error(
                "virtuozzo_api_error",
                result=data.get("result"),
                error=error_msg
            )
            raise VirtuozzoAPIError(f"Virtuozzo API error: {error_msg}")

        infos = data.get("infos", [])
        logger.info(
            "virtuozzo_getenvs_success",
            env_count=len(infos)
        )

        return data


class VirtuozzoAPIError(Exception):
    """Raised when Virtuozzo API returns an error result."""
    pass

}

### RabbitMQ Job Integration

Use RabbitMQ for async environment operations (create, sync, modify):

**Job Queue Structure:**

```python
# backend/app/infrastructure/messaging/queues.py

class JobQueues:
    """RabbitMQ queue definitions."""

    # Environment sync jobs (priority queue)
    ENVIRONMENTS_SYNC = "jobs.environments.sync"

    # Environment creation jobs
    ENVIRONMENTS_CREATE = "jobs.environments.create"

    # Environment modification jobs
    ENVIRONMENTS_MODIFY = "jobs.environments.modify"


# Routing keys for event publishing
class JobEvents:
    """Event routing keys for job status updates."""

    @staticmethod
    def sync_status(team_id: int) -> str:
        return f"team.{team_id}.environments.sync.status"

    @staticmethod
    def create_status(team_id: int, job_id: str) -> str:
        return f"team.{team_id}.environments.create.{job_id}.status"

Publishing Job to Queue:

# backend/app/domains/environments/service.py

async def trigger_environment_sync(
    team_id: int,
    user_id: int,
    specific_env_name: str | None = None
) -> dict:
    """
    Enqueue environment sync job to RabbitMQ.

    Returns immediately with job_id for tracking.
    """
    job_id = uuid.uuid4()
    team = await db.get(Team, team_id)

    if not team or not team.session_key_encrypted:
        raise HTTPException(400, "Team missing Virtuozzo credentials")

    job_payload = {
        "job_id": str(job_id),
        "team_id": team_id,
        "user_id": user_id,
        "session_key_encrypted": team.session_key_encrypted,
        "specific_env_name": specific_env_name,
        "priority": 100 if specific_env_name else 50,  # Higher priority for targeted sync
    }

    # Publish to RabbitMQ
    await rabbitmq_client.publish(
        queue=JobQueues.ENVIRONMENTS_SYNC,
        message=job_payload,
        priority=job_payload["priority"]
    )

    logger.info(
        "sync_job_enqueued",
        job_id=str(job_id),
        team_id=team_id,
        specific_env=specific_env_name
    )

    return {
        "job_id": str(job_id),
        "status": "queued",
        "message": "Environment sync queued"
    }

Worker Consuming Jobs:

# backend/app/modules/vz_sync/worker.py

async def consume_sync_jobs():
    """
    RabbitMQ consumer for environment sync jobs.

    Processes jobs from jobs.environments.sync queue.
    """
    async with rabbitmq_client.channel() as channel:
        await channel.set_qos(prefetch_count=1)

        queue = await channel.declare_queue(
            JobQueues.ENVIRONMENTS_SYNC,
            durable=True,
            arguments={"x-max-priority": 255}
        )

        async for message in queue:
            async with message.process():
                job_payload = json.loads(message.body)

                try:
                    await execute_sync_job(job_payload)
                except Exception as e:
                    logger.error(
                        "sync_job_failed",
                        job_id=job_payload["job_id"],
                        error=str(e),
                        exc_info=True
                    )
                    # Publish failure event
                    await publish_sync_status_event(
                        team_id=job_payload["team_id"],
                        job_id=job_payload["job_id"],
                        status="failed",
                        error=str(e)
                    )

Publishing Status Events:

async def publish_sync_status_event(
    team_id: int,
    job_id: str,
    status: str,
    error: str | None = None,
    env_count: int | None = None
) -> None:
    """
    Publish sync status to events exchange for SSE notifications.

    Frontend subscribes to: /api/v1/events?team_id={team_id}
    """
    event_payload = {
        "event_type": "sync.status",
        "job_id": job_id,
        "team_id": team_id,
        "status": status,
        "timestamp": datetime.now(timezone.utc).isoformat(),
    }

    if error:
        event_payload["error"] = error
    if env_count is not None:
        event_payload["environment_count"] = env_count

    routing_key = JobEvents.sync_status(team_id)

    await rabbitmq_client.publish_event(
        exchange="events.topic",
        routing_key=routing_key,
        message=event_payload
    )

    logger.info(
        "sync_status_event_published",
        job_id=job_id,
        team_id=team_id,
        status=status,
        routing_key=routing_key
    )

Data Validation Patterns

Environment Data Validation:

# backend/app/domains/environments/validators.py

class EnvironmentDataValidator:
    """Validate and sanitize Virtuozzo API data before DB insert."""

    @staticmethod
    def sanitize_environment_data(data: dict) -> dict:
        """
        Ensure all fields are correct types and handle missing values.

        Key Validations:
        1. Required fields have fallback values
        2. Numeric fields are actually numeric
        3. JSON fields are properly encoded
        4. Null parent_env_id is removed to preserve existing relationships
        """
        # Required fields with fallbacks
        data.setdefault("displayName", "Unnamed Environment")
        data.setdefault("envName", f"env-{uuid.uuid4().hex[:8]}")
        data.setdefault("status", "unknown")

        # Numeric field validation
        numeric_fields = ["uid", "creatorUid", "ownerUid"]
        for field in numeric_fields:
            if field in data and not isinstance(data[field], (int, type(None))):
                # Try to convert to int
                try:
                    data[field] = int(data[field])
                except (ValueError, TypeError):
                    logger.warning(
                        f"invalid_numeric_field",
                        field=field,
                        value=data[field]
                    )
                    data[field] = None

        # appid can be string or int - handle both
        if "appid" in data and not isinstance(data["appid"], (int, str, type(None))):
            data["appid"] = str(data["appid"])

        # JSON array fields - ensure valid or remove
        json_fields = ["extdomains", "envGroups"]
        for field in json_fields:
            if field in data:
                if data[field] is None:
                    # Remove null to preserve existing DB value
                    del data[field]
                elif isinstance(data[field], str):
                    # Validate JSON string
                    try:
                        json.loads(data[field])
                    except json.JSONDecodeError:
                        logger.warning(f"invalid_json_field", field=field)
                        del data[field]
                elif not isinstance(data[field], list):
                    logger.warning(f"invalid_array_field", field=field)
                    del data[field]

        # CRITICAL: Remove null parent_env_id to preserve existing relationships
        if "parent_env_id" in data and data["parent_env_id"] is None:
            del data["parent_env_id"]

        return data

    @staticmethod
    def validate_team_ownership(
        existing_env: Environment | None,
        sync_team_id: int,
        user_id: int
    ) -> int:
        """
        Validate and return correct team_id for environment.

        Rules:
        1. Existing env: Preserve original team_id (prevent cross-team overwrite)
        2. New env: Only allow if user owns the sync team
        3. Returns: The correct team_id to use
        """
        if existing_env:
            if existing_env.team_id != sync_team_id:
                logger.info(
                    "preserving_original_team_id",
                    env_name=existing_env.envName,
                    original_team_id=existing_env.team_id,
                    sync_team_id=sync_team_id
                )
            return existing_env.team_id

        # New environment - verify user owns team
        team = Team.query.get(sync_team_id)
        if not team or team.user_id != user_id:
            raise ValidationError(
                f"User {user_id} cannot create environments for team {sync_team_id}"
            )

        return sync_team_id

    @staticmethod
    def link_staging_parent(
        env_name: str,
        env_groups: list[str],
        team_id: int
    ) -> int | None:
        """
        For staging environments, find and link to production parent.

        Pattern: "staging7-myapp" -> parent is "myapp"
        """
        if "staging" not in env_groups:
            return None

        # Extract parent name (e.g., staging7-myapp -> myapp)
        parent_name = re.sub(r'^staging\d*-', '', env_name)
        if parent_name == env_name:
            return None  # Couldn't extract parent name

        parent_env = Environment.query.filter_by(
            team_id=team_id,
            envName=parent_name
        ).first()

        if parent_env:
            logger.info(
                "staging_parent_linked",
                staging_env=env_name,
                parent_env=parent_name,
                parent_id=parent_env.id
            )
            return parent_env.id

        logger.warning(
            "staging_parent_not_found",
            staging_env=env_name,
            expected_parent=parent_name
        )
        return None

Node Data Validation:

class NodeDataValidator:
    """Validate node data before upsert."""

    @staticmethod
    def sanitize_node_data(data: dict) -> dict:
        """Ensure node fields are valid types."""

        # node_id MUST be integer
        if "node_id" in data and not isinstance(data["node_id"], int):
            try:
                # Try to extract numeric part from mock IDs
                if isinstance(data["node_id"], str):
                    match = re.search(r'(\d+)$', data["node_id"])
                    if match:
                        data["node_id"] = int(match.group(1))
                    else:
                        raise ValueError("No numeric part found")
            except (ValueError, AttributeError):
                # Generate fallback ID
                data["node_id"] = random.randint(100000, 999999)
                logger.warning(
                    "generated_fallback_node_id",
                    original=data.get("node_id"),
                    fallback=data["node_id"]
                )

        # Cloudlet fields must be integers
        cloudlet_fields = ["fixedCloudlets", "flexibleCloudlets"]
        for field in cloudlet_fields:
            if field in data and not isinstance(data[field], (int, type(None))):
                try:
                    data[field] = int(data[field])
                except (ValueError, TypeError):
                    logger.warning(f"invalid_cloudlet_value", field=field)
                    data[field] = None

        # JSON fields
        json_fields = ["extIPs", "addons", "customitem"]
        for field in json_fields:
            if field in data and not isinstance(data[field], (str, type(None))):
                data[field] = json.dumps(data[field])

        return data

Sync Worker Flow Summary

async def execute_sync_job(job_payload: dict) -> None:
    """
    Main sync worker logic (simplified).

    Steps:
    1. Validate team membership
    2. Invalidate caches
    3. Decrypt session key (with self-healing)
    4. Fetch from Virtuozzo API (with cache-aside)
    5. Validate & map data
    6. Upsert environments and nodes
    7. Cleanup orphaned records
    8. Publish completion event
    """
    team_id = job_payload["team_id"]
    user_id = job_payload["user_id"]

    # 1. Validate team membership
    if not await team_auth_service.is_member(user_id, team_id):
        await complete_without_retry("Not team member")
        return

    # 2. Invalidate caches
    await invalidate_caches(team_id, user_id)

    # 3. Decrypt session key
    try:
        session_key = await decrypt_with_self_healing(
            job_payload["session_key_encrypted"],
            team_id,
            user_id
        )
    except CryptoError as e:
        await fail_job(str(e))
        return

    # 4. Fetch from API
    try:
        response = await virtuozzo_service.fetch_environments_and_nodes(
            session_key=session_key
        )
    except VirtuozzoAPIError as e:
        await fail_job_with_retry(str(e))
        return

    # 5-7. Process data
    infos = response.get("data", {}).get("infos", [])
    await sync_environments_and_nodes(team_id, user_id, infos)

    # 8. Publish success event
    await publish_sync_status_event(
        team_id=team_id,
        job_id=job_payload["job_id"],
        status="completed",
        env_count=len(infos)
    )

Freshness Metadata

Environment responses should include sync status:

def _serialize_environment(env: Environment) -> dict[str, Any]:
    return {
        "name": env.name,
        "display_name": env.display_name,
        "status": env.status,
        # ... other fields
        "sync_status": {
            "freshness": _calculate_freshness(env.synced_at),
            "last_synced_at": env.synced_at.isoformat() if env.synced_at else None,
            "is_stale": env.stale or False,
        }
    }

def _calculate_freshness(synced_at: datetime | None) -> str:
    if not synced_at:
        return "unknown"
    age = datetime.now(timezone.utc) - synced_at
    if age.total_seconds() < 600:  # < 10 minutes
        return "current"
    elif age.total_seconds() < 3600:  # < 1 hour
        return "recent"
    else:
        return "stale"

Triggering Sync on Demand

Members (with appropriate permissions) can trigger manual sync:

# POST /api/v1/teams/{team_id}/vz-sync
@router.post("/teams/{team_id}/vz-sync")
async def trigger_sync(
    team_id: int,
    current_user: Annotated[
        AuthenticatedUser,
        Depends(require_permission("team.manage"))
    ],
) -> dict[str, str]:
    if current_user.team_id != team_id:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Cannot manage different team.",
        )

    result = await trigger_environment_sync(
        team_id=team_id,
        user_id=current_user.user_id
    )

    return result  # {"job_id": "...", "status": "queued"}

Key Sync Worker Patterns (From Laravel Implementation)

The sync worker should implement these essential patterns:

1. Session Key Self-Healing - Try decryption with multiple strategies - Check if already in API format (167x...) - Check if already raw/unencrypted - Attempt Fernet decryption - Fallback: Self-heal from User or Environment models

2. Cache-Aside Strategy - Cache key: sync_data_{team_id} (10 minute TTL) - If no DB environments exist → force API call - Otherwise check cache first, call API if miss - Log API response metadata for debugging

3. Team Ownership Preservation - CRITICAL: Existing environments MUST preserve their original team_id - Only set team_id for NEW environments - Verify user owns team before creating new environments - Prevents cross-team data leakage during sync

4. Staging Environment Linking - Extract parent name from staging pattern: staging7-myappmyapp - Link parent_env_id to production environment - Preserve existing parent_env_id if API doesn't provide it

5. Orphaned Resource Cleanup - Only cleanup when doing FULL sync (not specific environment) - Only cleanup environments belonging to current team_id - Skip cleanup if environment has child staging environments - Delete in order: SFTP users → Backups → Nodes → Environment

6. Cache Invalidation - Invalidate BEFORE sync starts - Keys to clear: - sync_data_{team_id} - team_{team_id}_pending_sites_exist - team_{team_id}_site_creation_status_response - sync_status_{user_id} - DO NOT clear: sync_in_progress_{user_id} (only on completion)

7. Rate Limiting - Max 1 sync per team every 60 seconds - Use Redis SET with NX and EX for distributed locking - Backend processes can bypass via flag

8. Memory Management - Check memory usage periodically (default limit: 512 MB) - Fail gracefully if exceeded - Especially important for teams with 100+ environments

Freshness Metadata

Environment responses should include sync freshness:

def _serialize_environment(env: Environment) -> dict[str, Any]:
    return {
        "name": env.name,
        "display_name": env.display_name,
        "status": env.status,
        # ... other fields
        "sync_status": {
            "freshness": _calculate_freshness(env.synced_at),
            "last_synced_at": env.synced_at.isoformat() if env.synced_at else None,
            "is_stale": env.stale or False,
        }
    }

def _calculate_freshness(synced_at: datetime | None) -> str:
    if not synced_at:
        return "unknown"
    age = datetime.now(timezone.utc) - synced_at
    if age.total_seconds() < 600:  # < 10 minutes
        return "current"
    elif age.total_seconds() < 3600:  # < 1 hour
        return "recent"
    else:
        return "stale"

Triggering Sync on Demand

Members (with appropriate permissions) can trigger manual sync:

# POST /api/v1/teams/{team_id}/vz-sync
@router.post("/teams/{team_id}/vz-sync")
async def trigger_sync(
    team_id: int,
    current_user: Annotated[
        AuthenticatedUser,
        Depends(require_permission("team.manage"))
    ],
) -> dict[str, str]:
    if current_user.team_id != team_id:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Cannot manage different team.",
        )

    await sync_scheduler.force_sync(
        team_id=team_id,
        scope="full",
        reason="manual_api_request"
    )

    return {"status": "ok", "message": "Sync initiated"}

👥 Owner vs Member View Implementation

Unified View Model

Both owners and members see the same cached environment data. There is no separate "owner view" vs "member view" at the data level.

Differentiation happens at the permission level:

// frontend/src/features/environments/components/EnvironmentList.tsx
export function EnvironmentList({
  environments,
  currentUser
}: EnvironmentListProps) {
  const canCreate = currentUser.permissions.includes('server.create');
  const canDelete = currentUser.permissions.includes('server.delete');
  const canRestart = currentUser.permissions.includes('server.restart');

  return (
    <div>
      {canCreate && (
        <Button onClick={handleCreate}>Create Environment</Button>
      )}

      {environments.map(env => (
        <EnvironmentCard
          key={env.name}
          environment={env}
          actions={{
            restart: canRestart,
            delete: canDelete,
          }}
        />
      ))}
    </div>
  );
}

Permission-Based UI Elements

UI Element Visible To Condition
"Create Environment" button Members with server.create Default: Owner, Manager, Developer
"Delete" button Members with server.delete Default: Owner, Manager only
"Restart" button Members with server.restart Default: Owner, Manager, Developer
"Manage Billing" link Members with billing.edit Default: Owner, Manager only
"Sync Now" button Members with team.manage Default: Owner only

Frontend Implementation Pattern

// frontend/src/features/environments/hooks/useEnvironmentPermissions.ts
export function useEnvironmentPermissions() {
  const { user } = useAuth();

  return {
    canListEnvironments: true, // All team members
    canViewDetails: true,
    canCreate: user.permissions.includes('server.create'),
    canRestart: user.permissions.includes('server.restart'),
    canDelete: user.permissions.includes('server.delete'),
    canManageSync: user.permissions.includes('team.manage'),

    // Derived permissions
    hasFullAccess: user.role_name === 'Owner',
    isReadOnly: !user.permissions.some(p =>
      p.startsWith('server.') && p !== 'server.view'
    ),
  };
}

🔨 Implementation Checklist

Backend Tasks

  • Repository Layer (backend/app/domains/environments/repository.py)
  • list_team_environments(team_id) with sync metadata
  • get_environment_by_name(team_id, env_name)
  • shortdomain_exists(team_id, shortdomain)
  • has_active_job_for_shortdomain(team_id, shortdomain)
  • mark_environment_stale(team_id, env_name)

  • Service Layer (backend/app/domains/environments/service.py)

  • list_environments() - Read from cache with freshness
  • get_environment_details() - Single environment view
  • enqueue_environment_creation() - Job dispatch
  • Include sync status in all responses

  • Router (backend/app/api/v1/environments.py)

  • GET /environments - List (team membership required)
  • GET /environments/{env_name} - Details (team membership)
  • POST /environments - Create (server.create permission)
  • DELETE /environments/{env_name} - Delete (server.delete permission)
  • POST /environments/{env_name}/restart - Restart (server.restart permission)

  • Job Worker (backend/app/domains/environments/worker.py)

  • Decrypt owner session key
  • Call Virtuozzo API with owner credentials
  • Poll for environment status
  • Publish SSE events for job progress
  • Insert environment record on success
  • Log failures with structured logging

  • Sync Integration

  • Extend vz_sync_states to track environment sync
  • Worker updates synced_at and source_version on each sync
  • Manual sync endpoint with team.manage permission

Frontend Tasks

  • Type Definitions (frontend/src/features/environments/types/environment.types.ts)
  • Environment interface with sync metadata
  • EnvironmentCreateRequest schema
  • EnvironmentJobStatus enum
  • SyncStatus interface

  • API Client (frontend/src/features/environments/services/environmentService.ts)

  • getEnvironments()
  • getEnvironmentDetails(envName)
  • createEnvironment(payload)
  • deleteEnvironment(envName)
  • restartEnvironment(envName)
  • triggerSync()

  • Components

  • EnvironmentList - Table with sync badges
  • EnvironmentCard - Individual environment display
  • CreateEnvironmentDialog - Form with validation
  • EnvironmentSyncBadge - Freshness indicator
  • JobStatusTracker - SSE-based progress display

  • Permission Guards

  • useEnvironmentPermissions() hook
  • Conditional rendering based on permissions
  • Disable actions for insufficient permissions

🔍 Testing Strategy

Backend Tests

  1. Unit Tests (backend/tests/unit/domains/environments/)
  2. test_list_environments_team_scoping()
  3. test_create_environment_permission_check()
  4. test_create_environment_validates_vz_credentials()
  5. test_sync_status_calculation()

  6. Integration Tests (backend/tests/integration/domains/environments/)

  7. test_create_environment_job_flow_e2e()
  8. test_member_access_via_team_membership()
  9. test_sync_updates_environment_freshness()

  10. Permission Tests

  11. test_member_cannot_delete_without_permission()
  12. test_owner_has_wildcard_access()
  13. test_custom_role_with_server_create()

Frontend Tests

  1. Component Tests
  2. Permission-based rendering
  3. Create dialog validation
  4. SSE job status updates

  5. Integration Tests

  6. E2E environment creation flow
  7. Permission checks across member types
  8. Sync trigger and status update

📚 Reference Documentation


🚨 Common Pitfalls

1. Direct Virtuozzo Calls from Frontend

❌ Wrong:

// Frontend code
const response = await fetch('https://virtuozzo.api.com/environments', {
  headers: { 'X-Session': ownerSessionKey }
});

✅ Correct:

// Frontend code
const environments = await apiClient.get('/api/v1/environments');
// Backend proxies the call with owner credentials

2. Missing Team Context Validation

❌ Wrong:

async def list_environments(current_user: AuthenticatedUser):
    # Directly querying without team check
    return await db.execute(select(Environment))

✅ Correct:

async def list_environments(current_user: AuthenticatedUser):
    if current_user.team_id is None:
        raise HTTPException(403, "Missing team context")
    return await repository.list_team_environments(
        team_id=current_user.team_id
    )

3. Forgetting Permission Checks

❌ Wrong:

@router.delete("/environments/{env_name}")
async def delete_environment(
    env_name: str,
    current_user: AuthenticatedUser = Depends(get_current_user)
):
    # Anyone can delete!
    ...

✅ Correct:

@router.delete("/environments/{env_name}")
async def delete_environment(
    env_name: str,
    current_user: AuthenticatedUser = Depends(
        require_permission("server.delete")
    )
):
    ...

4. Not Including Sync Metadata

❌ Wrong:

return {"items": [{"name": env.name, "status": env.status}]}

✅ Correct:

return {
    "items": [{
        "name": env.name,
        "status": env.status,
        "sync_status": {
            "freshness": "current",
            "last_synced_at": env.synced_at
        }
    }]
}


💡 Best Practices

  1. Always validate team context before querying environments
  2. Use require_permission() dependency for all mutating operations
  3. Include sync metadata in all environment responses
  4. Log security violations when permission checks fail
  5. Use job system for any operation that calls Virtuozzo (>2s response time)
  6. Decrypt credentials in service layer, never pass encrypted keys to workers
  7. Publish SSE events for async job status updates
  8. Test with multiple roles (Owner, Manager, Developer) during development
  9. Document permission requirements in API docstrings
  10. Follow existing patterns from team management and RBAC implementations

📞 Support & Questions

For implementation questions or clarifications: - Review existing implementations in backend/app/domains/environments/ - Check RBAC patterns in backend/app/modules/rbac/ - Reference team management implementation in backend/app/modules/team_management/ - Consult sync architecture doc for job system patterns