|
|
"""
|
|
|
R2 Storage integration for OpenManus
|
|
|
Provides interface to Cloudflare R2 storage operations
|
|
|
"""
|
|
|
|
|
|
import io
|
|
|
from typing import Any, BinaryIO, Dict, List, Optional
|
|
|
|
|
|
from app.logger import logger
|
|
|
|
|
|
from .client import CloudflareClient, CloudflareError
|
|
|
|
|
|
|
|
|
class R2Storage:
|
|
|
"""Cloudflare R2 Storage client"""
|
|
|
|
|
|
def __init__(
|
|
|
self,
|
|
|
client: CloudflareClient,
|
|
|
storage_bucket: str,
|
|
|
assets_bucket: Optional[str] = None,
|
|
|
):
|
|
|
self.client = client
|
|
|
self.storage_bucket = storage_bucket
|
|
|
self.assets_bucket = assets_bucket or storage_bucket
|
|
|
self.base_endpoint = f"accounts/{client.account_id}/r2/buckets"
|
|
|
|
|
|
def _get_bucket_name(self, bucket_type: str = "storage") -> str:
|
|
|
"""Get bucket name based on type"""
|
|
|
if bucket_type == "assets":
|
|
|
return self.assets_bucket
|
|
|
return self.storage_bucket
|
|
|
|
|
|
async def upload_file(
|
|
|
self,
|
|
|
key: str,
|
|
|
file_data: bytes,
|
|
|
content_type: str = "application/octet-stream",
|
|
|
bucket_type: str = "storage",
|
|
|
metadata: Optional[Dict[str, str]] = None,
|
|
|
use_worker: bool = True,
|
|
|
) -> Dict[str, Any]:
|
|
|
"""Upload a file to R2"""
|
|
|
|
|
|
bucket_name = self._get_bucket_name(bucket_type)
|
|
|
|
|
|
try:
|
|
|
if use_worker:
|
|
|
|
|
|
form_data = {
|
|
|
"file": file_data,
|
|
|
"bucket": bucket_type,
|
|
|
"key": key,
|
|
|
"contentType": content_type,
|
|
|
}
|
|
|
|
|
|
if metadata:
|
|
|
form_data["metadata"] = metadata
|
|
|
|
|
|
response = await self.client.post(
|
|
|
"api/files", data=form_data, use_worker=True
|
|
|
)
|
|
|
else:
|
|
|
|
|
|
headers = {"Content-Type": content_type}
|
|
|
|
|
|
if metadata:
|
|
|
for k, v in metadata.items():
|
|
|
headers[f"x-amz-meta-{k}"] = v
|
|
|
|
|
|
response = await self.client.upload_file(
|
|
|
f"{self.base_endpoint}/{bucket_name}/objects/{key}",
|
|
|
file_data,
|
|
|
content_type,
|
|
|
headers,
|
|
|
)
|
|
|
|
|
|
return {
|
|
|
"success": True,
|
|
|
"key": key,
|
|
|
"bucket": bucket_type,
|
|
|
"bucket_name": bucket_name,
|
|
|
"size": len(file_data),
|
|
|
"content_type": content_type,
|
|
|
"url": f"/{bucket_type}/{key}",
|
|
|
**response,
|
|
|
}
|
|
|
|
|
|
except CloudflareError as e:
|
|
|
logger.error(f"R2 upload failed: {e}")
|
|
|
raise
|
|
|
|
|
|
async def upload_file_stream(
|
|
|
self,
|
|
|
key: str,
|
|
|
file_stream: BinaryIO,
|
|
|
content_type: str = "application/octet-stream",
|
|
|
bucket_type: str = "storage",
|
|
|
metadata: Optional[Dict[str, str]] = None,
|
|
|
) -> Dict[str, Any]:
|
|
|
"""Upload a file from stream"""
|
|
|
|
|
|
file_data = file_stream.read()
|
|
|
return await self.upload_file(
|
|
|
key, file_data, content_type, bucket_type, metadata
|
|
|
)
|
|
|
|
|
|
async def get_file(
|
|
|
self, key: str, bucket_type: str = "storage", use_worker: bool = True
|
|
|
) -> Optional[Dict[str, Any]]:
|
|
|
"""Get a file from R2"""
|
|
|
|
|
|
bucket_name = self._get_bucket_name(bucket_type)
|
|
|
|
|
|
try:
|
|
|
if use_worker:
|
|
|
response = await self.client.get(
|
|
|
f"api/files/{key}?bucket={bucket_type}", use_worker=True
|
|
|
)
|
|
|
|
|
|
if response:
|
|
|
return {
|
|
|
"key": key,
|
|
|
"bucket": bucket_type,
|
|
|
"bucket_name": bucket_name,
|
|
|
"data": response,
|
|
|
"exists": True,
|
|
|
}
|
|
|
else:
|
|
|
response = await self.client.get(
|
|
|
f"{self.base_endpoint}/{bucket_name}/objects/{key}"
|
|
|
)
|
|
|
|
|
|
return {
|
|
|
"key": key,
|
|
|
"bucket": bucket_type,
|
|
|
"bucket_name": bucket_name,
|
|
|
"data": response,
|
|
|
"exists": True,
|
|
|
}
|
|
|
|
|
|
except CloudflareError as e:
|
|
|
if e.status_code == 404:
|
|
|
return None
|
|
|
logger.error(f"R2 get file failed: {e}")
|
|
|
raise
|
|
|
|
|
|
return None
|
|
|
|
|
|
async def delete_file(
|
|
|
self, key: str, bucket_type: str = "storage", use_worker: bool = True
|
|
|
) -> Dict[str, Any]:
|
|
|
"""Delete a file from R2"""
|
|
|
|
|
|
bucket_name = self._get_bucket_name(bucket_type)
|
|
|
|
|
|
try:
|
|
|
if use_worker:
|
|
|
response = await self.client.delete(
|
|
|
f"api/files/{key}?bucket={bucket_type}", use_worker=True
|
|
|
)
|
|
|
else:
|
|
|
response = await self.client.delete(
|
|
|
f"{self.base_endpoint}/{bucket_name}/objects/{key}"
|
|
|
)
|
|
|
|
|
|
return {
|
|
|
"success": True,
|
|
|
"key": key,
|
|
|
"bucket": bucket_type,
|
|
|
"bucket_name": bucket_name,
|
|
|
**response,
|
|
|
}
|
|
|
|
|
|
except CloudflareError as e:
|
|
|
logger.error(f"R2 delete failed: {e}")
|
|
|
raise
|
|
|
|
|
|
async def list_files(
|
|
|
self,
|
|
|
bucket_type: str = "storage",
|
|
|
prefix: str = "",
|
|
|
limit: int = 1000,
|
|
|
use_worker: bool = True,
|
|
|
) -> Dict[str, Any]:
|
|
|
"""List files in R2 bucket"""
|
|
|
|
|
|
bucket_name = self._get_bucket_name(bucket_type)
|
|
|
|
|
|
try:
|
|
|
if use_worker:
|
|
|
params = {"bucket": bucket_type, "prefix": prefix, "limit": limit}
|
|
|
|
|
|
query_string = "&".join([f"{k}={v}" for k, v in params.items() if v])
|
|
|
response = await self.client.get(
|
|
|
f"api/files/list?{query_string}", use_worker=True
|
|
|
)
|
|
|
else:
|
|
|
params = {"prefix": prefix, "max-keys": limit}
|
|
|
|
|
|
query_string = "&".join([f"{k}={v}" for k, v in params.items() if v])
|
|
|
response = await self.client.get(
|
|
|
f"{self.base_endpoint}/{bucket_name}/objects?{query_string}"
|
|
|
)
|
|
|
|
|
|
return {
|
|
|
"bucket": bucket_type,
|
|
|
"bucket_name": bucket_name,
|
|
|
"prefix": prefix,
|
|
|
"files": response.get("objects", []),
|
|
|
"truncated": response.get("truncated", False),
|
|
|
**response,
|
|
|
}
|
|
|
|
|
|
except CloudflareError as e:
|
|
|
logger.error(f"R2 list files failed: {e}")
|
|
|
raise
|
|
|
|
|
|
async def get_file_metadata(
|
|
|
self, key: str, bucket_type: str = "storage", use_worker: bool = True
|
|
|
) -> Optional[Dict[str, Any]]:
|
|
|
"""Get file metadata without downloading content"""
|
|
|
|
|
|
bucket_name = self._get_bucket_name(bucket_type)
|
|
|
|
|
|
try:
|
|
|
if use_worker:
|
|
|
response = await self.client.get(
|
|
|
f"api/files/{key}/metadata?bucket={bucket_type}", use_worker=True
|
|
|
)
|
|
|
else:
|
|
|
|
|
|
response = await self.client.get(
|
|
|
f"{self.base_endpoint}/{bucket_name}/objects/{key}",
|
|
|
headers={"Range": "bytes=0-0"},
|
|
|
)
|
|
|
|
|
|
if response:
|
|
|
return {
|
|
|
"key": key,
|
|
|
"bucket": bucket_type,
|
|
|
"bucket_name": bucket_name,
|
|
|
**response,
|
|
|
}
|
|
|
|
|
|
except CloudflareError as e:
|
|
|
if e.status_code == 404:
|
|
|
return None
|
|
|
logger.error(f"R2 get metadata failed: {e}")
|
|
|
raise
|
|
|
|
|
|
return None
|
|
|
|
|
|
async def copy_file(
|
|
|
self,
|
|
|
source_key: str,
|
|
|
destination_key: str,
|
|
|
source_bucket: str = "storage",
|
|
|
destination_bucket: str = "storage",
|
|
|
use_worker: bool = True,
|
|
|
) -> Dict[str, Any]:
|
|
|
"""Copy a file within R2 or between buckets"""
|
|
|
|
|
|
try:
|
|
|
if use_worker:
|
|
|
copy_data = {
|
|
|
"sourceKey": source_key,
|
|
|
"destinationKey": destination_key,
|
|
|
"sourceBucket": source_bucket,
|
|
|
"destinationBucket": destination_bucket,
|
|
|
}
|
|
|
|
|
|
response = await self.client.post(
|
|
|
"api/files/copy", data=copy_data, use_worker=True
|
|
|
)
|
|
|
else:
|
|
|
|
|
|
source_file = await self.get_file(source_key, source_bucket, False)
|
|
|
|
|
|
if not source_file:
|
|
|
raise CloudflareError(f"Source file {source_key} not found")
|
|
|
|
|
|
|
|
|
response = await self.upload_file(
|
|
|
destination_key,
|
|
|
source_file["data"],
|
|
|
bucket_type=destination_bucket,
|
|
|
use_worker=False,
|
|
|
)
|
|
|
|
|
|
return {
|
|
|
"success": True,
|
|
|
"source_key": source_key,
|
|
|
"destination_key": destination_key,
|
|
|
"source_bucket": source_bucket,
|
|
|
"destination_bucket": destination_bucket,
|
|
|
**response,
|
|
|
}
|
|
|
|
|
|
except CloudflareError as e:
|
|
|
logger.error(f"R2 copy failed: {e}")
|
|
|
raise
|
|
|
|
|
|
async def move_file(
|
|
|
self,
|
|
|
source_key: str,
|
|
|
destination_key: str,
|
|
|
source_bucket: str = "storage",
|
|
|
destination_bucket: str = "storage",
|
|
|
use_worker: bool = True,
|
|
|
) -> Dict[str, Any]:
|
|
|
"""Move a file (copy then delete)"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
copy_result = await self.copy_file(
|
|
|
source_key,
|
|
|
destination_key,
|
|
|
source_bucket,
|
|
|
destination_bucket,
|
|
|
use_worker,
|
|
|
)
|
|
|
|
|
|
|
|
|
delete_result = await self.delete_file(
|
|
|
source_key, source_bucket, use_worker
|
|
|
)
|
|
|
|
|
|
return {
|
|
|
"success": True,
|
|
|
"source_key": source_key,
|
|
|
"destination_key": destination_key,
|
|
|
"source_bucket": source_bucket,
|
|
|
"destination_bucket": destination_bucket,
|
|
|
"copy_result": copy_result,
|
|
|
"delete_result": delete_result,
|
|
|
}
|
|
|
|
|
|
except CloudflareError as e:
|
|
|
logger.error(f"R2 move failed: {e}")
|
|
|
raise
|
|
|
|
|
|
async def generate_presigned_url(
|
|
|
self,
|
|
|
key: str,
|
|
|
bucket_type: str = "storage",
|
|
|
expires_in: int = 3600,
|
|
|
method: str = "GET",
|
|
|
) -> Dict[str, Any]:
|
|
|
"""Generate a presigned URL for direct access"""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
url_data = {
|
|
|
"key": key,
|
|
|
"bucket": bucket_type,
|
|
|
"expiresIn": expires_in,
|
|
|
"method": method,
|
|
|
}
|
|
|
|
|
|
response = await self.client.post(
|
|
|
"api/files/presigned-url", data=url_data, use_worker=True
|
|
|
)
|
|
|
|
|
|
return {
|
|
|
"success": True,
|
|
|
"key": key,
|
|
|
"bucket": bucket_type,
|
|
|
"method": method,
|
|
|
"expires_in": expires_in,
|
|
|
**response,
|
|
|
}
|
|
|
|
|
|
except CloudflareError as e:
|
|
|
logger.error(f"R2 presigned URL generation failed: {e}")
|
|
|
raise
|
|
|
|
|
|
async def get_storage_stats(self, use_worker: bool = True) -> Dict[str, Any]:
|
|
|
"""Get storage statistics"""
|
|
|
|
|
|
try:
|
|
|
if use_worker:
|
|
|
response = await self.client.get("api/files/stats", use_worker=True)
|
|
|
else:
|
|
|
|
|
|
storage_list = await self.list_files("storage", use_worker=False)
|
|
|
assets_list = await self.list_files("assets", use_worker=False)
|
|
|
|
|
|
storage_size = sum(
|
|
|
file.get("size", 0) for file in storage_list.get("files", [])
|
|
|
)
|
|
|
assets_size = sum(
|
|
|
file.get("size", 0) for file in assets_list.get("files", [])
|
|
|
)
|
|
|
|
|
|
response = {
|
|
|
"storage": {
|
|
|
"file_count": len(storage_list.get("files", [])),
|
|
|
"total_size": storage_size,
|
|
|
},
|
|
|
"assets": {
|
|
|
"file_count": len(assets_list.get("files", [])),
|
|
|
"total_size": assets_size,
|
|
|
},
|
|
|
"total": {
|
|
|
"file_count": len(storage_list.get("files", []))
|
|
|
+ len(assets_list.get("files", [])),
|
|
|
"total_size": storage_size + assets_size,
|
|
|
},
|
|
|
}
|
|
|
|
|
|
return response
|
|
|
|
|
|
except CloudflareError as e:
|
|
|
logger.error(f"R2 storage stats failed: {e}")
|
|
|
raise
|
|
|
|
|
|
def create_file_stream(self, data: bytes) -> io.BytesIO:
|
|
|
"""Create a file stream from bytes"""
|
|
|
return io.BytesIO(data)
|
|
|
|
|
|
def get_public_url(self, key: str, bucket_type: str = "storage") -> str:
|
|
|
"""Get public URL for a file (if bucket is configured for public access)"""
|
|
|
bucket_name = self._get_bucket_name(bucket_type)
|
|
|
|
|
|
|
|
|
|
|
|
if self.client.worker_url:
|
|
|
return f"{self.client.worker_url}/api/files/{key}?bucket={bucket_type}"
|
|
|
|
|
|
|
|
|
return f"https://pub-{bucket_name}.r2.dev/{key}"
|
|
|
|