| | |
| | | from typing import Generator, Union |
| | | |
| | | import requests |
| | | from urllib.parse import urlparse |
| | | |
| | | |
| | | def download_from_url(url): |
| | | result = urlparse(url) |
| | | file_path = None |
| | | if result.scheme is not None and len(result.scheme) > 0: |
| | | storage = HTTPStorage() |
| | | # bytes |
| | | data = storage.read(url) |
| | | work_dir = tempfile.TemporaryDirectory().name |
| | | if not os.path.exists(work_dir): |
| | | os.makedirs(work_dir) |
| | | file_path = os.path.join(work_dir, os.path.basename(url)) |
| | | with open(file_path, "wb") as fb: |
| | | fb.write(data) |
| | | assert file_path is not None, f"failed to download: {url}" |
| | | return file_path |
| | | |
| | | |
| | | class Storage(metaclass=ABCMeta): |
| | |
| | | pass |
| | | |
| | | @abstractmethod |
| | | def write_text(self, |
| | | obj: str, |
| | | filepath: Union[str, Path], |
| | | encoding: str = 'utf-8') -> None: |
| | | def write_text(self, obj: str, filepath: Union[str, Path], encoding: str = "utf-8") -> None: |
| | | pass |
| | | |
| | | |
| | |
| | | Returns: |
| | | bytes: Expected bytes object. |
| | | """ |
| | | with open(filepath, 'rb') as f: |
| | | with open(filepath, "rb") as f: |
| | | content = f.read() |
| | | return content |
| | | |
| | | def read_text(self, |
| | | filepath: Union[str, Path], |
| | | encoding: str = 'utf-8') -> str: |
| | | def read_text(self, filepath: Union[str, Path], encoding: str = "utf-8") -> str: |
| | | """Read data from a given ``filepath`` with 'r' mode. |
| | | |
| | | Args: |
| | |
| | | Returns: |
| | | str: Expected text reading from ``filepath``. |
| | | """ |
| | | with open(filepath, 'r', encoding=encoding) as f: |
| | | with open(filepath, "r", encoding=encoding) as f: |
| | | value_buf = f.read() |
| | | return value_buf |
| | | |
| | |
| | | if dirname and not os.path.exists(dirname): |
| | | os.makedirs(dirname, exist_ok=True) |
| | | |
| | | with open(filepath, 'wb') as f: |
| | | with open(filepath, "wb") as f: |
| | | f.write(obj) |
| | | |
| | | def write_text(self, |
| | | obj: str, |
| | | filepath: Union[str, Path], |
| | | encoding: str = 'utf-8') -> None: |
| | | def write_text(self, obj: str, filepath: Union[str, Path], encoding: str = "utf-8") -> None: |
| | | """Write data to a given ``filepath`` with 'w' mode. |
| | | |
| | | Note: |
| | |
| | | if dirname and not os.path.exists(dirname): |
| | | os.makedirs(dirname, exist_ok=True) |
| | | |
| | | with open(filepath, 'w', encoding=encoding) as f: |
| | | with open(filepath, "w", encoding=encoding) as f: |
| | | f.write(obj) |
| | | |
| | | @contextlib.contextmanager |
| | | def as_local_path( |
| | | self, |
| | | filepath: Union[str, |
| | | Path]) -> Generator[Union[str, Path], None, None]: |
| | | def as_local_path(self, filepath: Union[str, Path]) -> Generator[Union[str, Path], None, None]: |
| | | """Only for unified API and do nothing.""" |
| | | yield filepath |
| | | |
| | |
| | | return r.text |
| | | |
| | | @contextlib.contextmanager |
| | | def as_local_path( |
| | | self, filepath: str) -> Generator[Union[str, Path], None, None]: |
| | | def as_local_path(self, filepath: str) -> Generator[Union[str, Path], None, None]: |
| | | """Download a file from ``filepath``. |
| | | |
| | | ``as_local_path`` is decorated by :meth:`contextlib.contextmanager`. It |
| | |
| | | os.remove(f.name) |
| | | |
| | | def write(self, obj: bytes, url: Union[str, Path]) -> None: |
| | | raise NotImplementedError('write is not supported by HTTP Storage') |
| | | raise NotImplementedError("write is not supported by HTTP Storage") |
| | | |
| | | def write_text(self, |
| | | obj: str, |
| | | url: Union[str, Path], |
| | | encoding: str = 'utf-8') -> None: |
| | | raise NotImplementedError( |
| | | 'write_text is not supported by HTTP Storage') |
| | | def write_text(self, obj: str, url: Union[str, Path], encoding: str = "utf-8") -> None: |
| | | raise NotImplementedError("write_text is not supported by HTTP Storage") |
| | | |
| | | |
| | | class OSSStorage(Storage): |
| | |
| | | |
| | | def __init__(self, oss_config_file=None): |
| | | # read from config file or env var |
| | | raise NotImplementedError( |
| | | 'OSSStorage.__init__ to be implemented in the future') |
| | | raise NotImplementedError("OSSStorage.__init__ to be implemented in the future") |
| | | |
| | | def read(self, filepath): |
| | | raise NotImplementedError( |
| | | 'OSSStorage.read to be implemented in the future') |
| | | raise NotImplementedError("OSSStorage.read to be implemented in the future") |
| | | |
| | | def read_text(self, filepath, encoding='utf-8'): |
| | | raise NotImplementedError( |
| | | 'OSSStorage.read_text to be implemented in the future') |
| | | def read_text(self, filepath, encoding="utf-8"): |
| | | raise NotImplementedError("OSSStorage.read_text to be implemented in the future") |
| | | |
| | | @contextlib.contextmanager |
| | | def as_local_path( |
| | | self, filepath: str) -> Generator[Union[str, Path], None, None]: |
| | | def as_local_path(self, filepath: str) -> Generator[Union[str, Path], None, None]: |
| | | """Download a file from ``filepath``. |
| | | |
| | | ``as_local_path`` is decorated by :meth:`contextlib.contextmanager`. It |
| | |
| | | os.remove(f.name) |
| | | |
| | | def write(self, obj: bytes, filepath: Union[str, Path]) -> None: |
| | | raise NotImplementedError( |
| | | 'OSSStorage.write to be implemented in the future') |
| | | raise NotImplementedError("OSSStorage.write to be implemented in the future") |
| | | |
| | | def write_text(self, |
| | | obj: str, |
| | | filepath: Union[str, Path], |
| | | encoding: str = 'utf-8') -> None: |
| | | raise NotImplementedError( |
| | | 'OSSStorage.write_text to be implemented in the future') |
| | | def write_text(self, obj: str, filepath: Union[str, Path], encoding: str = "utf-8") -> None: |
| | | raise NotImplementedError("OSSStorage.write_text to be implemented in the future") |
| | | |
| | | |
| | | G_STORAGES = {} |
| | |
| | | |
| | | class File(object): |
| | | _prefix_to_storage: dict = { |
| | | 'oss': OSSStorage, |
| | | 'http': HTTPStorage, |
| | | 'https': HTTPStorage, |
| | | 'local': LocalStorage, |
| | | "oss": OSSStorage, |
| | | "http": HTTPStorage, |
| | | "https": HTTPStorage, |
| | | "local": LocalStorage, |
| | | } |
| | | |
| | | @staticmethod |
| | | def _get_storage(uri): |
| | | assert isinstance(uri, |
| | | str), f'uri should be str type, but got {type(uri)}' |
| | | assert isinstance(uri, str), f"uri should be str type, but got {type(uri)}" |
| | | |
| | | if '://' not in uri: |
| | | if "://" not in uri: |
| | | # local path |
| | | storage_type = 'local' |
| | | storage_type = "local" |
| | | else: |
| | | prefix, _ = uri.split('://') |
| | | prefix, _ = uri.split("://") |
| | | storage_type = prefix |
| | | |
| | | assert storage_type in File._prefix_to_storage, \ |
| | | f'Unsupported uri {uri}, valid prefixs: '\ |
| | | f'{list(File._prefix_to_storage.keys())}' |
| | | assert storage_type in File._prefix_to_storage, ( |
| | | f"Unsupported uri {uri}, valid prefixs: " f"{list(File._prefix_to_storage.keys())}" |
| | | ) |
| | | |
| | | if storage_type not in G_STORAGES: |
| | | G_STORAGES[storage_type] = File._prefix_to_storage[storage_type]() |
| | |
| | | return storage.read(uri) |
| | | |
| | | @staticmethod |
| | | def read_text(uri: Union[str, Path], encoding: str = 'utf-8') -> str: |
| | | def read_text(uri: Union[str, Path], encoding: str = "utf-8") -> str: |
| | | """Read data from a given ``filepath`` with 'r' mode. |
| | | |
| | | Args: |
| | |
| | | return storage.write(obj, uri) |
| | | |
| | | @staticmethod |
| | | def write_text(obj: str, uri: str, encoding: str = 'utf-8') -> None: |
| | | def write_text(obj: str, uri: str, encoding: str = "utf-8") -> None: |
| | | """Write data to a given ``filepath`` with 'w' mode. |
| | | |
| | | Note: |