Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
File size: 2,658 Bytes
fd1b271 210796c fd1b271 210796c fd1b271 210796c fd1b271 210796c fd1b271 210796c fd1b271 210796c fd1b271 210796c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
import os
import json
import io
import zipfile
from google.oauth2.service_account import Credentials
from googleapiclient.discovery import build
from googleapiclient.http import MediaIoBaseDownload
class ZipDownloader:
def __init__(self, service_account_json: str):
self.creds = Credentials.from_service_account_info(json.loads(service_account_json))
self.service = build("drive", "v3", credentials=self.creds)
def get_drive_file_size(self, file_id: str) -> int:
file_metadata = self.service.files().get(fileId=file_id, fields='size').execute()
return int(file_metadata['size'])
def download_zip_from_drive(self, file_id: str, output_path: str) -> str:
expected_size = self.get_drive_file_size(file_id)
local_zip_path = os.path.join(output_path, "downloaded.zip")
os.makedirs(output_path, exist_ok=True)
# Use BytesIO instead of writing directly to disk
buffer = io.BytesIO()
request = self.service.files().get_media(fileId=file_id)
downloader = MediaIoBaseDownload(buffer, request)
print(f"β¬οΈ Downloading ZIP file from Drive ID: {file_id}")
done = False
while not done:
status, done = downloader.next_chunk()
print(f" β¬ Progress: {int(status.progress() * 100)}%")
buffer.seek(0)
actual_size = len(buffer.getvalue())
print(f"π¦ Expected size: {expected_size} bytes")
print(f"π₯ Downloaded size: {actual_size} bytes")
if actual_size != expected_size:
raise IOError(f"β Downloaded file is incomplete! {actual_size} < {expected_size}")
with open(local_zip_path, 'wb') as f:
f.write(buffer.read())
print(f"β
ZIP saved to: {local_zip_path}")
return local_zip_path
def unzip(self, zip_path: str, extract_to: str):
"""
Unzips the downloaded ZIP file to a specified directory, with error handling.
"""
print(f"π Extracting ZIP: {zip_path} -> {extract_to}")
os.makedirs(extract_to, exist_ok=True)
try:
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
bad_file = zip_ref.testzip()
if bad_file:
raise zipfile.BadZipFile(f"Corrupted file inside ZIP: {bad_file}")
zip_ref.extractall(extract_to)
print("β
Extraction complete.")
except zipfile.BadZipFile as e:
print(f"β ZIP file is corrupted: {e}")
raise
except EOFError as e:
print(f"β Unexpected end of file during extraction: {e}")
raise
|