| | |
| | | import yaml |
| | | from onnxruntime import (GraphOptimizationLevel, InferenceSession, |
| | | SessionOptions, get_available_providers, get_device) |
| | | from typeguard import check_argument_types |
| | | |
| | | import warnings |
| | | |
| | |
| | | class TokenIDConverter(): |
| | | def __init__(self, token_list: Union[List, str], |
| | | ): |
| | | check_argument_types() |
| | | |
| | | # self.token_list = self.load_token(token_path) |
| | | self.token_list = token_list |
| | | self.unk_symbol = token_list[-1] |
| | | self.token2id = {v: i for i, v in enumerate(self.token_list)} |
| | | self.unk_id = self.token2id[self.unk_symbol] |
| | | |
| | | # @staticmethod |
| | | # def load_token(file_path: Union[Path, str]) -> List: |
| | | # if not Path(file_path).exists(): |
| | | # raise TokenIDConverterError(f'The {file_path} does not exist.') |
| | | # |
| | | # with open(str(file_path), 'rb') as f: |
| | | # token_list = pickle.load(f) |
| | | # |
| | | # if len(token_list) != len(set(token_list)): |
| | | # raise TokenIDConverterError('The Token exists duplicated symbol.') |
| | | # return token_list |
| | | |
| | | def get_num_vocabulary_size(self) -> int: |
| | | return len(self.token_list) |
| | |
| | | return [self.token_list[i] for i in integers] |
| | | |
| | | def tokens2ids(self, tokens: Iterable[str]) -> List[int]: |
| | | token2id = {v: i for i, v in enumerate(self.token_list)} |
| | | if self.unk_symbol not in token2id: |
| | | raise TokenIDConverterError( |
| | | f"Unknown symbol '{self.unk_symbol}' doesn't exist in the token_list" |
| | | ) |
| | | unk_id = token2id[self.unk_symbol] |
| | | return [token2id.get(i, unk_id) for i in tokens] |
| | | |
| | | return [self.token2id.get(i, self.unk_id) for i in tokens] |
| | | |
| | | |
| | | class CharTokenizer(): |
| | |
| | | space_symbol: str = "<space>", |
| | | remove_non_linguistic_symbols: bool = False, |
| | | ): |
| | | check_argument_types() |
| | | |
| | | self.space_symbol = space_symbol |
| | | self.non_linguistic_symbols = self.load_symbols(symbol_value) |
| | |
| | | input_content: List[Union[np.ndarray, np.ndarray]]) -> np.ndarray: |
| | | input_dict = dict(zip(self.get_input_names(), input_content)) |
| | | try: |
| | | return self.session.run(None, input_dict) |
| | | return self.session.run(self.get_output_names(), input_dict) |
| | | except Exception as e: |
| | | raise ONNXRuntimeError('ONNXRuntime inferece failed.') from e |
| | | |
| | |
| | | if not model_path.is_file(): |
| | | raise FileExistsError(f'{model_path} is not a file.') |
| | | |
| | | def split_to_mini_sentence(words: list, word_limit: int = 20): |
| | | assert word_limit > 1 |
| | | if len(words) <= word_limit: |
| | | return [words] |
| | | sentences = [] |
| | | length = len(words) |
| | | sentence_len = length // word_limit |
| | | for i in range(sentence_len): |
| | | sentences.append(words[i * word_limit:(i + 1) * word_limit]) |
| | | if length % word_limit > 0: |
| | | sentences.append(words[sentence_len * word_limit:]) |
| | | return sentences |
| | | |
| | | def code_mix_split_words(text: str): |
| | | words = [] |
| | | segs = text.split() |
| | | for seg in segs: |
| | | # There is no space in seg. |
| | | current_word = "" |
| | | for c in seg: |
| | | if len(c.encode()) == 1: |
| | | # This is an ASCII char. |
| | | current_word += c |
| | | else: |
| | | # This is a Chinese char. |
| | | if len(current_word) > 0: |
| | | words.append(current_word) |
| | | current_word = "" |
| | | words.append(c) |
| | | if len(current_word) > 0: |
| | | words.append(current_word) |
| | | return words |
| | | |
| | | def read_yaml(yaml_path: Union[str, Path]) -> Dict: |
| | | if not Path(yaml_path).exists(): |
| | |
| | | |
| | | |
| | | @functools.lru_cache() |
| | | def get_logger(name='rapdi_paraformer'): |
| | | def get_logger(name='funasr_onnx'): |
| | | """Initialize and get a logger by name. |
| | | If the logger has not been initialized, this method will initialize the |
| | | logger by adding one or two handlers, otherwise the initialized logger will |
| | |
| | | logger.addHandler(sh) |
| | | logger_initialized[name] = True |
| | | logger.propagate = False |
| | | logging.basicConfig(level=logging.ERROR) |
| | | return logger |