#%% md # **install python packages** # # !pip install scipy openai tiktoken retry dashscope loguru #%% from multiprocessing import Process, Manager import json import os from pprint import pprint import re from tqdm import tqdm import random import uuid import openai import tiktoken import json import numpy as np import requests from retry import retry from scipy import sparse #from rank_bm25 import BM25Okapi #import jieba from http import HTTPStatus import dashscope from concurrent.futures import ThreadPoolExecutor, as_completed from loguru import logger import json import time from tqdm import tqdm logger.remove() # 移除默认的控制台输出 logger.add("logs/app_{time:YYYY-MM-DD}.log", level="INFO", rotation="00:00", retention="10 days", compression="zip") MODEL_NAME = 'qwen2-7b-instruct' #%% # 注意:这里需要填入你的key~ 咱们在第二步申请的。 dashscope.api_key="sk-e6a29593d7e24d9a88daa5ddf84c9a53" #%% md # 这段代码定义了一个名为 call_qwen_api 的函数,该函数用于调用一个名为 dashscope.Generation 的 API 来生成文本。以下是该代码的功能、用途和特点的详细介绍: # # * 功能 # 调用 API 生成文本:该函数通过传递一个模型名称 (MODEL_NAME) 和一个查询 (query) 来调用 dashscope.Generation.call 方法,生成相应的文本。 # 处理 API 响应:函数会检查 API 的响应状态码,如果状态码为 HTTPStatus.OK,则提取并返回生成的文本内容。如果状态码不是 HTTPStatus.OK,则打印错误信息并抛出异常。 # * 用途 # 文本生成:该函数主要用于通过调用外部 API 来生成文本,适用于需要动态生成内容的场景,如聊天机器人、内容创作辅助等。 # 错误处理:通过检查 API 响应状态码并处理错误情况,确保在调用失败时能够及时发现并处理问题。 # * 特点 # 重试机制:函数使用了 @retry(delay=3, tries=3) 装饰器,这意味着在调用 API 失败时,函数会自动重试最多 3 次,每次重试间隔 3 秒。 # 消息格式:在调用 API 时,设置了 result_format='message',表示期望的响应格式是消息格式。 # 错误处理:在 API 调用失败时,函数会打印详细的错误信息,包括请求 ID、状态码、错误代码和错误消息,并抛出异常,以便上层调用者能够捕获并处理这些错误。 #%% def api_retry(MODEL_NAME, query): max_retries = 5 retry_delay = 60 # in seconds attempts = 0 while attempts < max_retries: try: return call_qwen_api(MODEL_NAME, query) except Exception as e: attempts += 1 if attempts < max_retries: logger.warning(f"Attempt {attempts} failed for text: {query}. Retrying in {retry_delay} seconds...") time.sleep(retry_delay) else: logger.error(f"All {max_retries} attempts failed for text: {query}. Error: {e}") raise #%% def call_qwen_api(MODEL_NAME, query): # 这里采用dashscope的api调用模型推理,通过http传输的json封装返回结果 messages = [ {'role': 'user', 'content': query}] response = dashscope.Generation.call( MODEL_NAME, messages=messages, result_format='message', # set the result is message format. ) if response.status_code == HTTPStatus.OK: # print(response) return response['output']['choices'][0]['message']['content'] else: print('Request id: %s, Status code: %s, error code: %s, error message: %s' % ( response.request_id, response.status_code, response.code, response.message )) raise Exception() #%% # 这里定义了prompt推理模版 def get_prompt(problem, question, options): options = '\n'.join(f"{'ABCDEFG'[i]}. {o}" for i, o in enumerate(options)) prompt = f"""你是一个逻辑推理专家,擅长解决逻辑推理问题。以下是一个逻辑推理的题目,形式为单项选择题。所有的问题都是(close-world assumption)闭世界假设,即未观测事实都为假。请逐步分析问题并在最后一行输出答案,最后一行的格式为"答案是:A"。题目如下: ### 题目: {problem} ### 问题: {question} {options} """ # print(prompt) return prompt #%% # 这里使用extract抽取模获得抽取的结果 def extract(input_text): ans_pattern = re.compile(r"答案是:(.)", re.S) problems = ans_pattern.findall(input_text) # print(problems) if(problems == ''): return 'A' return problems[0] #%% def process_datas(datas,MODEL_NAME): results = [] with ThreadPoolExecutor(max_workers=16) as executor: future_data = {} lasttask = '' lastmark = 0 lens = 0 for data in tqdm(datas, desc="Submitting tasks", total=len(datas)): problem = data['problem'] for id,question in enumerate(data['questions']): prompt = get_prompt(problem, question['question'], question['options'], ) future = executor.submit(api_retry, MODEL_NAME, prompt) future_data[future] = (data,id) time.sleep(0.6) # 控制每0.5秒提交一个任务 lens += 1 for future in tqdm(as_completed(future_data), total=lens, desc="Processing tasks"): # print('data',data) data = future_data[future][0] problem_id = future_data[future][1] try: res = future.result() extract_response = extract(res) # print('res',extract_response) data['questions'][problem_id]['answer'] = extract_response results.append(data) # print('data',data) except Exception as e: logger.error(f"Failed to process text: {data}. Error: {e}") return results #%% def main(ifn, ofn): if os.path.exists(ofn): pass data = [] # 按行读取数据 with open(ifn) as reader: for line in reader: sample = json.loads(line) data.append(sample) datas = data # print(data) # 均匀地分成多个数据集 return_list = process_datas(datas,MODEL_NAME) print(len(return_list)) print("All tasks finished!") return return_list #%% def evaluate(ofn): data = [] with open(ofn) as reader: for line in reader: sample = json.loads(line) data.append(sample) pse = 0 cnt = 0 tot = 0 for task in data: for question in task['questions']: if MODEL_NAME in question: tot += 1 cnt += question[MODEL_NAME] == question['answer'] else: pse += 1 print(cnt, tot, cnt/tot, pse) #%% if __name__ == '__main__': a = extract("""根据欧几里得算法,逐步解析计算两个数6和7的最大公约数(gcd)的步骤如下: 1. 判断6和7是否相等:不相等。 2. 判断6和7大小关系,7 > 6,所以用更大的数7减去较小的数6得到结果1。 3. 现在计算6和1的最大公约数。 4. 6 > 1,根据算法用更大的数6减去较小的数1得到结果5。 5. 再计算5和1的最大公约数。 6. 5 > 1,用5减去1得到结果4。 7. 再计算4和1的最大公约数。 8. 4 > 1,用4减去1得到结果3。 9. 再计算3和1的最大公约数。 10. 3 > 1,用3减去1得到结果2。 11. 再计算2和1的最大公约数。 12. 2 > 1,用2减去1得到结果1。 13. 最后计算1和1的最大公约数,两数相等,gcd即为这两个数,也就是1。 因此,6和7的最大公约数是1。 答案是:C.""") print(a) return_list = main('round1_test_data.jsonl', 'upload.jsonl') #%% def has_complete_answer(questions): # 这里假设完整答案的判断逻辑是:每个question都有一个'answer'键 for question in questions: if 'answer' not in question: return False return True def filter_problems(data): result = [] problem_set = set() for item in data: # print('处理的item' ,item) problem = item['problem'] if problem in problem_set: # 找到已存在的字典 for existing_item in result: if existing_item['problem'] == problem: # 如果当前字典有完整答案,替换已存在的字典 if has_complete_answer(item['questions']): existing_item['questions'] = item['questions'] existing_item['id'] = item['id'] break else: # 如果当前字典有完整答案,添加到结果列表 if has_complete_answer(item['questions']): result.append(item) problem_set.add(problem) return result #%% return_list return_list = filter_problems(return_list) sorted_data = sorted(return_list, key=lambda x: int(str(x['id'])[-3:])) print(sorted_data) #%% sorted_data #%% def find_missing_ids(dict_list): # 提取所有序号 extracted_ids = {int(d['id'][-3:]) for d in dict_list} # 创建0-500的序号集合 all_ids = set(range(500)) # 找出缺失的序号 missing_ids = all_ids - extracted_ids return sorted(missing_ids) # 示例字典列表 dict_list = sorted_data # 找出缺失的序号 missing_ids = find_missing_ids(dict_list) print("缺失的序号:", missing_ids) #%% len(missing_ids) #%% data = [] with open('round1_test_data.jsonl') as reader: for id,line in enumerate(reader): if(id in missing_ids): sample = json.loads(line) for question in sample['questions']: question['answer'] = 'A' sorted_data.append(sample) sorted_data = sorted(sorted_data, key=lambda x: int(str(x['id'])[-3:])) #%% with open('upload.jsonl', 'w') as writer: for sample in sorted_data: writer.write(json.dumps(sample, ensure_ascii=False)) writer.write('\n') #%%
1. api_retry 函数:
- 功能:对 call_qwen_api 函数进行重试封装。
- 参数:MODEL_NAME(模型名称)和 query(查询文本)。
- 逻辑:设置最大重试次数和重试间隔,在出现异常时进行重试,超过最大重试次数则记录错误日志并抛出异常。
2. call_qwen_api 函数:
- 功能:调用 dashscope.Generation API 生成文本。
- 参数:MODEL_NAME(模型名称)和 query(查询文本)。
- 逻辑:将查询文本封装为消息格式,调用 API 生成结果。如果请求成功,返回生成的文本;否则打印错误信息并抛出异常。
3. get_prompt 函数:
- 功能:根据给定的问题、选项生成符合模型输入格式的 prompt。
- 参数:problem(题目)、question(问题)和 options(选项列表)。
- 逻辑:将选项格式化为 "A. xxx\nB. xxx" 的形式,拼接题目、问题和格式化后的选项,生成完整的 prompt。
4. extract 函数:
- 功能:从模型生成的文本中提取预测的答案。
- 参数:input_text(模型生成的文本)。
- 逻辑:使用正则表达式匹配 "答案是:X" 格式的行,提取出预测的答案字母。如果未匹配到,默认返回 "A"。
5. process_datas 函数:
- 功能:并行处理多个问题,调用模型生成答案并收集结果。
- 参数:datas(包含题目、问题和选项的数据列表)和 model_name(模型名称)。
- 逻辑:
- 创建 ThreadPoolExecutor 进行并行处理。
- 遍历每个数据项,对其中的每个问题生成 prompt,使用 api_retry 函数提交查询并存储返回的 Future。
- 遍历完成的 Future,使用 extract 函数提取预测答案,将其添加到原始数据项中。
- 将更新后的数据项添加到结果列表中。
- 返回结果列表。
6. 日志记录:
- 使用 loguru 库进行日志记录。
- 配置日志记录器,将日志写入按天命名的文件中,设置日志级别为 INFO。
7. 程序执行流程:
- 首先,定义了一些常量和辅助函数,如 MODEL_NAME、api_retry、call_qwen_api、get_prompt 和 extract。
- 然后,定义了主要的处理函数 process_datas,用于并行处理问题数据集。
- 接着,可能会有一段代码(未提供)用于读取或准备输入数据。
- 最后,调用 process_datas 函数,传入准备好的数据列表和模型名称,开始处理问题并生成答案。
8. 并行处理:
- 代码使用了 Python 的 concurrent.futures 模块中的 ThreadPoolExecutor 来实现并行处理。
- 通过创建线程池,可以同时将多个问题提交给 API 进行处理,提高了效率。
- 使用 submit 方法提交任务,返回的 Future 对象用于后续获取结果。
- 通过 as_completed 函数,可以在任务完成后立即获取结果,而不必等待所有任务都完成。
9. 错误处理和重试:
- 在 call_qwen_api 函数中,通过检查 API 响应的状态码来判断请求是否成功。
- 如果请求失败,会打印错误信息并抛出异常,由上层的 api_retry 函数捕获。
- api_retry 函数实现了重试机制,在出现异常时会尝试重试指定次数,并在重试间隔一定时间。
- 如果所有重试都失败,会记录错误日志并再次抛出异常,由上层的调用者处理。
10. 日志记录:
- 代码使用了 loguru 库来记录日志。
- 通过 logger 对象,可以方便地在代码中添加日志记录语句。
- 日志记录器被配置为将日志写入按天命名的文件中,方便后续查看和分析。
- 日志级别设置为 INFO,可以根据需要调整为其他级别,如 DEBUG、WARNING 或 ERROR。
11. 代码可扩展性:
- 这段代码具有良好的可扩展性,可以根据需求进行调整和扩展。
- 可以修改 get_prompt 函数,以支持不同格式的问题和选项。例如,可以添加对问题类型的判断,根据类型生成不同的 prompt。
- 可以扩展 extract 函数,以支持更复杂的答案提取逻辑。例如,可以使用更高级的自然语言处理技术,如命名实体识别、关键词提取等,来从生成的文本中提取答案。
- 可以添加更多的数据预处理和后处理步骤,如数据清洗、过滤、格式化等,以提高数据质量和处理效果。
- 可以引入其他的 API 或模型,如知识图谱、搜索引擎等,与文本生成 API 结合使用,以获得更准确和全面的答案。
12. 性能优化:
- 代码中使用了并行处理,通过线程池提交任务,可以提高处理速度。但是,如果需要处理大规模数据集,可能需要进一步优化。
- 可以考虑使用多进程instead of多线程,利用多个 CPU 核心来并行处理任务。Python 的 multiprocessing 模块提供了简单的多进程支持。
- 可以对数据进行分批处理,避免一次性加载所有数据到内存中。可以使用生成器或迭代器来逐批读取数据,减少内存占用。
- 可以对频繁调用的函数或方法进行缓存,如使用 functools.lru_cache 装饰器对函数结果进行缓存,避免重复计算。
- 可以优化 API 调用过程,如使用异步 I/O 库(如 aiohttp)发送请求,减少阻塞等待时间。
13. 错误处理和日志记录:
- 代码中使用了异常处理和重试机制,可以处理 API 调用过程中的错误。但是,对于其他类型的错误,如数据格式错误、网络错误等,可能需要添加更多的异常处理逻辑。
- 可以在关键步骤添加更详细的日志记录,如记录请求参数、响应结果、异常信息等,方便问题定位和调试。
- 可以将日志发送到远程日志服务器,如 ELK 栈(Elasticsearch、Logstash、Kibana),集中管理和分析日志。
- 可以设置合理的日志级别和日志格式,避免记录过多不必要的信息,提高日志的可读性和可用性。
14. 代码工程化:
- 可以将代码组织成模块化的结构,如将数据处理、API 调用、答案提取等功能拆分成独立的模块,提高代码的可读性和可维护性。
- 可以编写单元测试,对关键函数和逻辑进行测试,确保代码的正确性和稳定性。
- 可以使用版本控制工具,如 Git,对代码进行管理和追踪,方便协作和部署。
- 可以添加必要的文档和注释,说明代码的功能、输入输出、调用方式等,方便其他开发者理解和使用。