Spaces:
Running
Running
| """COS utitities.""" | |
| import logging | |
| import os | |
| import tempfile | |
| from io import BufferedReader | |
| from typing import List, Optional, Tuple | |
| from urllib.parse import urlparse | |
| import boto3 | |
| from boto3_type_annotations.s3 import Bucket | |
| from botocore.client import Config | |
| logger = logging.getLogger(__name__) | |
| def connect_bucket(s3_uri: str) -> Tuple[Bucket, List[str]]: | |
| parsed_uri = urlparse(s3_uri) | |
| # parse bucket and path, where path can be empty list | |
| _, bucket_name, *split_key = parsed_uri.path.split("/") | |
| # parsing credentials and host | |
| credentials, host = parsed_uri.netloc.split("@") | |
| # getting keys | |
| access, secret = credentials.split(":") | |
| # establish connection | |
| connection = boto3.resource( | |
| "s3", | |
| endpoint_url="http://{}".format(host), | |
| aws_access_key_id=access, | |
| aws_secret_access_key=secret, | |
| config=Config(signature_version="s3v4"), | |
| region_name="us-east-1", | |
| ) | |
| return connection.Bucket(bucket_name), split_key | |
| def ensure_filepath_from_uri(file_uri: str) -> str: | |
| """ | |
| Get a file on the local storage. | |
| In case the file_uri provided is a S3 URI, dowloads the | |
| file and return the local path. | |
| Args: | |
| file_uri (str): a uri, either filesystem or S3. | |
| Returns: | |
| str: the path to the file on the local filesystem. | |
| """ | |
| if file_uri.startswith("s3://"): | |
| try: | |
| bucket, split_key = connect_bucket(file_uri) | |
| path = os.path.join(*split_key) | |
| # create a file handle for storing the file locally | |
| a_file = tempfile.NamedTemporaryFile(delete=False) | |
| # make sure we close the file | |
| a_file.close() | |
| # download the file | |
| bucket.download_file(path, a_file.name) | |
| return a_file.name | |
| except Exception: | |
| message = "Getting file from COS failed " "for the provided URI: {}".format( | |
| file_uri | |
| ) | |
| logger.exception(message) | |
| raise RuntimeError(message) | |
| else: | |
| logger.debug(f"Searching for {file_uri}") | |
| if os.path.exists(file_uri): | |
| return file_uri | |
| else: | |
| message = "File not found on local filesystem." | |
| logger.error(message) | |
| raise RuntimeError(message) | |
| # COS configuration | |
| COS_BUCKET_URI = os.environ.get( | |
| "COS_BUCKET_URI", os.path.join(os.getcwd(), "artifacts") | |
| ) | |
| COS_UPLOAD_POLICY = os.environ.get("COS_UPLOAD_POLICY", "public-read-write") | |
| # results prefix | |
| RESULTS_PREFIX = "results" | |
| def download_from_key(key: str, file_path: Optional[str] = None) -> None: | |
| """Download a single file from COS. | |
| If no file_path is given, object name is taken as relative local path. | |
| Args: | |
| key (str): S3 key. | |
| file_path (str, optional): Path of downloaded file. Defaults to None. | |
| """ | |
| file_path = key if file_path is None else file_path | |
| os.makedirs(os.path.dirname(file_path), exist_ok=True) | |
| BUCKET.download_file(key, file_path) | |
| def upload_to_key(file_path: str, key: str) -> None: | |
| """Upload local file to COS. | |
| Args: | |
| file_path (str): Local filepath. | |
| key (str): S3 key. | |
| """ | |
| BUCKET.upload_file(file_path, key) | |
| def fileobject_to_key(readable_binary: BufferedReader, key: str) -> None: | |
| """Upload readable, binary file from handle to COS. | |
| Args: | |
| readable_binary (BufferedReader): filehandle, e.g. opened in 'rb' mode. | |
| key (str): S3 key. | |
| """ | |
| BUCKET.upload_fileobj(readable_binary, key) | |
| def delete_from_key(key_or_prefix: str) -> None: | |
| """Delete all files matching given prefix from COS. | |
| Args: | |
| key_or_prefix (str): S3 uri including object name prefix. | |
| """ | |
| BUCKET.objects.filter(Prefix=key_or_prefix).delete() | |
| def string_to_key(string: str, key: str) -> None: | |
| """Upload string as object to COS. | |
| Args: | |
| string (str): object to be stored. | |
| key (str): S3 key. | |
| """ | |
| BUCKET.put_object(Key=key, Body=string.encode()) | |
| def bytes_to_key(some_bytes: bytes, key: str) -> None: | |
| """Upload bytes as object to COS. | |
| Args: | |
| some_bytes (bytes): object to be stored. | |
| key (str): S3 key. | |
| """ | |
| BUCKET.put_object(Key=key, Body=some_bytes) | |
| def string_from_key(key: str) -> str: | |
| """Get object from COS as string. | |
| Args: | |
| key (str): S3 key. | |
| Returns: | |
| str: object. | |
| """ | |
| return BUCKET.Object(key).get()["Body"].read().decode("utf-8") | |
| def bytes_from_key(key: str) -> bytes: | |
| """Get object from COS as bytes. | |
| Args: | |
| key (str): S3 key. | |
| Returns: | |
| bytes: object. | |
| """ | |
| return BUCKET.Object(key).get()["Body"].read() | |