| | |
| | | data_file_lists = f.readlines() |
| | | lines_for_each_th = (len(data_file_lists)-1)//cpu_cores + 1 |
| | | task_num = cpu_cores if len(data_file_lists) > cpu_cores else 1 |
| | | with concurrent.futures.ThreadPoolExecutor(max_workers=cpu_cores) as executor: |
| | | |
| | | futures = [executor.submit(parse_context_length, data_file_lists[i*lines_for_each_th:(i+1)*lines_for_each_th], data_type) for i in range(task_num)] |
| | | # import pdb;pdb.set_trace() |
| | | if task_num > 1: |
| | | with concurrent.futures.ThreadPoolExecutor(max_workers=cpu_cores) as executor: |
| | | |
| | | for future in concurrent.futures.as_completed(futures): |
| | | |
| | | json_dict[data_type].update(future.result()) |
| | | # print(json_dict) |
| | | futures = [executor.submit(parse_context_length, data_file_lists[i*lines_for_each_th:(i+1)*lines_for_each_th], data_type) for i in range(task_num)] |
| | | |
| | | for future in concurrent.futures.as_completed(futures): |
| | | |
| | | json_dict[data_type].update(future.result()) |
| | | else: |
| | | res = parse_context_length(data_file_lists, data_type) |
| | | json_dict[data_type].update(res) |
| | | |
| | | with open(jsonl_file_out, "w") as f: |
| | | for key in json_dict[data_type_list[0]].keys(): |
| | | jsonl_line = {"key": key} |
| | |
| | | jsonl_line = json.dumps(jsonl_line, ensure_ascii=False) |
| | | f.write(jsonl_line+"\n") |
| | | f.flush() |
| | | print(f"processed {len(json_dict[data_type_list[0]])} samples") |
| | | |
| | | else: |
| | | pass |