import os import boto3 # In-memory cache for workspace files (lives for the duration of the warm session) _cache: dict[str, str] = {} _s3 = None def _get_s3(): global _s3 if _s3 is None: _s3 = boto3.client('s3') return _s3 def get_bucket() -> str: return os.environ.get('WORKSPACE_BUCKET_NAME', 'agent-claw-workspace-495395224548') def read_file(path: str) -> str: """Read a workspace file from S3 (cached).""" if path not in _cache: resp = _get_s3().get_object(Bucket=get_bucket(), Key=path) _cache[path] = resp['Body'].read().decode('utf-8') return _cache[path] def write_file(path: str, content: str) -> str: """Write a workspace file to S3 and update cache.""" _get_s3().put_object( Bucket=get_bucket(), Key=path, Body=content.encode('utf-8'), ContentType='text/markdown', ) _cache[path] = content return f"Written {len(content)} bytes to {path}" def load_persona_files() -> dict[str, str]: """Load all persona files at session start (SOUL.md etc.)""" files = {} for fname in ['SOUL.md', 'AGENTS.md', 'IDENTITY.md', 'USER.md']: try: files[fname] = read_file(fname) except Exception: pass return files