import csv from datetime import datetime import pydantic import json import os import yaml README_META = os.getenv('README_META', '{}') DESCRIPTION = os.getenv('DESCRIPTION', '') MODE = os.getenv('MODE', '') def parse_yaml_from_readme_to_dict(readme_path: str): with open(readme_path, 'r') as f: lines = f.read() yaml_str = ''.join(lines) parts = yaml_str.split("---") try: obj = yaml.load(parts[1], Loader=yaml.FullLoader) except Exception as e: print(e, yaml_str) obj = {} return obj def get_target_csv_file_from_outputs(): output_dir = './outputs/demo' dirname = os.listdir(output_dir)[0] output_dir = os.path.join(output_dir, dirname, 'summary') files = os.listdir(output_dir) csv_file = '' for file in files: if file.endswith('.csv'): csv_file = file break return os.path.join(output_dir, csv_file) def parse_results_to_json(csv_path: str, output_dir: str): if not os.path.exists(csv_path): raise Exception(f'csv_path: {csv_path} not exists') if not os.path.exists(output_dir): os.mkdir(output_dir) with open(csv_path, 'r') as file: # 创建一个 CSV reader 对象 reader = csv.reader(file) # 读取 CSV 文件的数据 data = list(reader) print(data) class Dataset(pydantic.BaseModel): csv_index: int name: str # 数据集名 version: str measure_name: str # 指标名 type_: str datasets = [] for i in range(1, len(data)): name = data[i][0].strip() if '---' in name: print(f'skip non-row line: {name}') continue print(f'dataset: {data[i][0]}, version: {data[i][1]}, measure_name: {data[i][2]},type_: {data[i][3]}') dataset = Dataset(csv_index=i, name=name, version=data[i][1], measure_name=data[i][2], type_=data[i][3], value=-1) datasets.append(dataset) class Model(pydantic.BaseModel): csv_index: int id: str description: str updatedAt: str mode: str results: dict metadata: dict models = [] for i in range(4, len(data[0])): print(f'model name: {data[0][i]}') model = Model(csv_index=i, id=data[0][i], description=str(DESCRIPTION), updatedAt=str(datetime.now().strftime("%Y-%m-%d %H:%M:%S")), mode=str(MODE), results={}, metadata=json.loads(README_META)) models.append(model) dict_models = [] for model in models: res = {} for dataset in datasets: value_str = data[dataset.csv_index][model.csv_index].strip() if value_str != '-': try: value = float(value_str) except Exception: continue res[dataset.name] = value print(res) model.results = res dict_models.append(model.dict()) for dict_model in dict_models: if dict_model.get("id") == '': continue del dict_model['csv_index'] # replace / and @ to - # output_dir = './outputs/' target_filename = dict_model.get("id").replace('/', '-').replace('@', '-') m_path = os.path.join(output_dir, target_filename + '.json') with open(m_path, 'w') as f: f.write(json.dumps(dict_model, indent=4, ensure_ascii=False)) if __name__ == '__main__': csv_file = get_target_csv_file_from_outputs() parse_results_to_json(csv_file, './leaderboards')