liugz18
2024-07-18 d80ac2fd2df4e7fb8a28acfa512bb11472b5cc99
funasr/download/file.py
@@ -8,6 +8,24 @@
from typing import Generator, Union
import requests
from urllib.parse import urlparse
def download_from_url(url):
    result = urlparse(url)
    file_path = None
    if result.scheme is not None and len(result.scheme) > 0:
        storage = HTTPStorage()
        # bytes
        data = storage.read(url)
        work_dir = tempfile.TemporaryDirectory().name
        if not os.path.exists(work_dir):
            os.makedirs(work_dir)
        file_path = os.path.join(work_dir, os.path.basename(url))
        with open(file_path, "wb") as fb:
            fb.write(data)
    assert file_path is not None, f"failed to download: {url}"
    return file_path
class Storage(metaclass=ABCMeta):
@@ -31,10 +49,7 @@
        pass
    @abstractmethod
    def write_text(self,
                   obj: str,
                   filepath: Union[str, Path],
                   encoding: str = 'utf-8') -> None:
    def write_text(self, obj: str, filepath: Union[str, Path], encoding: str = "utf-8") -> None:
        pass
@@ -50,13 +65,11 @@
        Returns:
            bytes: Expected bytes object.
        """
        with open(filepath, 'rb') as f:
        with open(filepath, "rb") as f:
            content = f.read()
        return content
    def read_text(self,
                  filepath: Union[str, Path],
                  encoding: str = 'utf-8') -> str:
    def read_text(self, filepath: Union[str, Path], encoding: str = "utf-8") -> str:
        """Read data from a given ``filepath`` with 'r' mode.
        Args:
@@ -67,7 +80,7 @@
        Returns:
            str: Expected text reading from ``filepath``.
        """
        with open(filepath, 'r', encoding=encoding) as f:
        with open(filepath, "r", encoding=encoding) as f:
            value_buf = f.read()
        return value_buf
@@ -86,13 +99,10 @@
        if dirname and not os.path.exists(dirname):
            os.makedirs(dirname, exist_ok=True)
        with open(filepath, 'wb') as f:
        with open(filepath, "wb") as f:
            f.write(obj)
    def write_text(self,
                   obj: str,
                   filepath: Union[str, Path],
                   encoding: str = 'utf-8') -> None:
    def write_text(self, obj: str, filepath: Union[str, Path], encoding: str = "utf-8") -> None:
        """Write data to a given ``filepath`` with 'w' mode.
        Note:
@@ -109,14 +119,11 @@
        if dirname and not os.path.exists(dirname):
            os.makedirs(dirname, exist_ok=True)
        with open(filepath, 'w', encoding=encoding) as f:
        with open(filepath, "w", encoding=encoding) as f:
            f.write(obj)
    @contextlib.contextmanager
    def as_local_path(
            self,
            filepath: Union[str,
                            Path]) -> Generator[Union[str, Path], None, None]:
    def as_local_path(self, filepath: Union[str, Path]) -> Generator[Union[str, Path], None, None]:
        """Only for unified API and do nothing."""
        yield filepath
@@ -136,8 +143,7 @@
        return r.text
    @contextlib.contextmanager
    def as_local_path(
            self, filepath: str) -> Generator[Union[str, Path], None, None]:
    def as_local_path(self, filepath: str) -> Generator[Union[str, Path], None, None]:
        """Download a file from ``filepath``.
        ``as_local_path`` is decorated by :meth:`contextlib.contextmanager`. It
@@ -163,14 +169,10 @@
            os.remove(f.name)
    def write(self, obj: bytes, url: Union[str, Path]) -> None:
        raise NotImplementedError('write is not supported by HTTP Storage')
        raise NotImplementedError("write is not supported by HTTP Storage")
    def write_text(self,
                   obj: str,
                   url: Union[str, Path],
                   encoding: str = 'utf-8') -> None:
        raise NotImplementedError(
            'write_text is not supported by HTTP Storage')
    def write_text(self, obj: str, url: Union[str, Path], encoding: str = "utf-8") -> None:
        raise NotImplementedError("write_text is not supported by HTTP Storage")
class OSSStorage(Storage):
@@ -178,20 +180,16 @@
    def __init__(self, oss_config_file=None):
        # read from config file or env var
        raise NotImplementedError(
            'OSSStorage.__init__ to be implemented in the future')
        raise NotImplementedError("OSSStorage.__init__ to be implemented in the future")
    def read(self, filepath):
        raise NotImplementedError(
            'OSSStorage.read to be implemented in the future')
        raise NotImplementedError("OSSStorage.read to be implemented in the future")
    def read_text(self, filepath, encoding='utf-8'):
        raise NotImplementedError(
            'OSSStorage.read_text to be implemented in the future')
    def read_text(self, filepath, encoding="utf-8"):
        raise NotImplementedError("OSSStorage.read_text to be implemented in the future")
    @contextlib.contextmanager
    def as_local_path(
            self, filepath: str) -> Generator[Union[str, Path], None, None]:
    def as_local_path(self, filepath: str) -> Generator[Union[str, Path], None, None]:
        """Download a file from ``filepath``.
        ``as_local_path`` is decorated by :meth:`contextlib.contextmanager`. It
@@ -217,15 +215,10 @@
            os.remove(f.name)
    def write(self, obj: bytes, filepath: Union[str, Path]) -> None:
        raise NotImplementedError(
            'OSSStorage.write to be implemented in the future')
        raise NotImplementedError("OSSStorage.write to be implemented in the future")
    def write_text(self,
                   obj: str,
                   filepath: Union[str, Path],
                   encoding: str = 'utf-8') -> None:
        raise NotImplementedError(
            'OSSStorage.write_text to be implemented in the future')
    def write_text(self, obj: str, filepath: Union[str, Path], encoding: str = "utf-8") -> None:
        raise NotImplementedError("OSSStorage.write_text to be implemented in the future")
G_STORAGES = {}
@@ -233,27 +226,26 @@
class File(object):
    _prefix_to_storage: dict = {
        'oss': OSSStorage,
        'http': HTTPStorage,
        'https': HTTPStorage,
        'local': LocalStorage,
        "oss": OSSStorage,
        "http": HTTPStorage,
        "https": HTTPStorage,
        "local": LocalStorage,
    }
    @staticmethod
    def _get_storage(uri):
        assert isinstance(uri,
                          str), f'uri should be str type, but got {type(uri)}'
        assert isinstance(uri, str), f"uri should be str type, but got {type(uri)}"
        if '://' not in uri:
        if "://" not in uri:
            # local path
            storage_type = 'local'
            storage_type = "local"
        else:
            prefix, _ = uri.split('://')
            prefix, _ = uri.split("://")
            storage_type = prefix
        assert storage_type in File._prefix_to_storage, \
            f'Unsupported uri {uri}, valid prefixs: '\
            f'{list(File._prefix_to_storage.keys())}'
        assert storage_type in File._prefix_to_storage, (
            f"Unsupported uri {uri}, valid prefixs: " f"{list(File._prefix_to_storage.keys())}"
        )
        if storage_type not in G_STORAGES:
            G_STORAGES[storage_type] = File._prefix_to_storage[storage_type]()
@@ -274,7 +266,7 @@
        return storage.read(uri)
    @staticmethod
    def read_text(uri: Union[str, Path], encoding: str = 'utf-8') -> str:
    def read_text(uri: Union[str, Path], encoding: str = "utf-8") -> str:
        """Read data from a given ``filepath`` with 'r' mode.
        Args:
@@ -304,7 +296,7 @@
        return storage.write(obj, uri)
    @staticmethod
    def write_text(obj: str, uri: str, encoding: str = 'utf-8') -> None:
    def write_text(obj: str, uri: str, encoding: str = "utf-8") -> None:
        """Write data to a given ``filepath`` with 'w' mode.
        Note: