import base64 import hashlib import json import time import urllib.error import urllib.request _SIGNED_URL_EXPIRY_SECONDS = 3600 class BunnyClient: def __init__( self, zone: str, api_key: str, cdn_base_url: str, token_auth_key: str, storage_endpoint: str, ) -> None: self._zone = zone self._api_key = api_key self._cdn_base_url = cdn_base_url.rstrip("/") self._token_auth_key = token_auth_key self._storage_endpoint = storage_endpoint.rstrip("/") def _storage_url(self, path: str) -> str: return f"{self._storage_endpoint}/{self._zone}/{path.lstrip('/')}" def list_directory(self, path: str) -> list[str]: print(f"Listing directories in: {self._storage_url(path)}") req = urllib.request.Request( self._storage_url(path), method="GET", headers={"AccessKey": self._api_key}, ) try: with urllib.request.urlopen(req) as resp: if resp.status == 200: print(f"Successfully listed directory '{path}' with Bunny storage client.") response_content = resp.read().decode() print(f"Response content: {response_content}") return json.loads(response_content) else: print(f"Unexpected response status {resp.status} when listing directory: {resp.read().decode()}") return [] except urllib.error.HTTPError as e: print(f"HTTPError when listing directory: {e}") return [] def upload(self, path: str, data: bytes) -> bool: req = urllib.request.Request( self._storage_url(path), data=data, method="PUT", headers={ "AccessKey": self._api_key, "Content-Type": "audio/wav", }, ) try: with urllib.request.urlopen(req) as resp: if resp.status == 201: print(f"Successfully uploaded '{path}' with Bunny storage client.") return True else: print(f"Unexpected response status {resp.status} when uploading '{path}': {resp.read().decode()}") return False except urllib.error.HTTPError as e: print(f"HTTPError when uploading '{path}': {e}") return False def get_url(self, path: str) -> str: url_path = f"/{path.lstrip('/')}" expiration = int(time.time()) + _SIGNED_URL_EXPIRY_SECONDS digest = hashlib.sha256( (self._token_auth_key + url_path + str(expiration)).encode() ).digest() token = ( base64.b64encode(digest) .decode() .replace("+", "-") .replace("/", "_") .replace("=", "") ) return f"{self._cdn_base_url}{url_path}?token={token}&expires={expiration}" def get_public_url(self, path: str) -> str: return f"{self._cdn_base_url}/{path.lstrip('/')}" def delete(self, path: str) -> bool: req = urllib.request.Request( self._storage_url(path), method="DELETE", headers={"AccessKey": self._api_key}, ) try: with urllib.request.urlopen(req) as resp: return resp.status == 200 except urllib.error.HTTPError: return False def download(self, path: str) -> tuple[bytes, str]: raise NotImplementedError( "Direct download not available with Bunny — use get_url() to obtain a signed CDN URL" )