File size: 2,658 Bytes
fd1b271
 
 
 
 
 
 
 
 
 
 
 
 
210796c
 
 
 
fd1b271
210796c
fd1b271
 
210796c
 
 
 
 
fd1b271
 
 
 
 
 
 
210796c
 
 
 
 
 
 
 
 
 
 
 
fd1b271
 
210796c
 
fd1b271
 
210796c
fd1b271
 
210796c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import os
import json
import io
import zipfile
from google.oauth2.service_account import Credentials
from googleapiclient.discovery import build
from googleapiclient.http import MediaIoBaseDownload

class ZipDownloader:
    def __init__(self, service_account_json: str):
        self.creds = Credentials.from_service_account_info(json.loads(service_account_json))
        self.service = build("drive", "v3", credentials=self.creds)

    def get_drive_file_size(self, file_id: str) -> int:
        file_metadata = self.service.files().get(fileId=file_id, fields='size').execute()
        return int(file_metadata['size'])

    def download_zip_from_drive(self, file_id: str, output_path: str) -> str:
        expected_size = self.get_drive_file_size(file_id)
        local_zip_path = os.path.join(output_path, "downloaded.zip")
        os.makedirs(output_path, exist_ok=True)

        # Use BytesIO instead of writing directly to disk
        buffer = io.BytesIO()
        request = self.service.files().get_media(fileId=file_id)
        downloader = MediaIoBaseDownload(buffer, request)

        print(f"⬇️ Downloading ZIP file from Drive ID: {file_id}")
        done = False
        while not done:
            status, done = downloader.next_chunk()
            print(f"   ⏬ Progress: {int(status.progress() * 100)}%")

        buffer.seek(0)
        actual_size = len(buffer.getvalue())
        print(f"πŸ“¦ Expected size: {expected_size} bytes")
        print(f"πŸ“₯ Downloaded size: {actual_size} bytes")

        if actual_size != expected_size:
            raise IOError(f"❌ Downloaded file is incomplete! {actual_size} < {expected_size}")

        with open(local_zip_path, 'wb') as f:
            f.write(buffer.read())

        print(f"βœ… ZIP saved to: {local_zip_path}")
        return local_zip_path



    def unzip(self, zip_path: str, extract_to: str):
        """
        Unzips the downloaded ZIP file to a specified directory, with error handling.
        """
        print(f"πŸ“‚ Extracting ZIP: {zip_path} -> {extract_to}")
        os.makedirs(extract_to, exist_ok=True)

        try:
            with zipfile.ZipFile(zip_path, 'r') as zip_ref:
                bad_file = zip_ref.testzip()
                if bad_file:
                    raise zipfile.BadZipFile(f"Corrupted file inside ZIP: {bad_file}")
                zip_ref.extractall(extract_to)
            print("βœ… Extraction complete.")
        except zipfile.BadZipFile as e:
            print(f"❌ ZIP file is corrupted: {e}")
            raise
        except EOFError as e:
            print(f"❌ Unexpected end of file during extraction: {e}")
            raise