多线程并发测试场景模拟

avatar
作者
筋斗云
阅读量:0

append ---通过append方法在列表末尾添加新对象。

zip -- 可以将两个列表合并转化为字典

reverse -- 通过列表自带的方法 reverse 进行倒叙

多线程执行python 并发请求接口

方法一: import requests import threading  # 提交巡检  模拟多线程  def submit_xun(url):     # url ="http://*************/api/v1.0/task/dailySubmit"     header = {         "token":"Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhdWQiOiJtb2ZhbmciLCJpc3MiOiJzc28iLCJpZCI6IjAyODM2IiwibmFtZSI6Iuadjua"     }     json = {         "taskCategoryDtos": [             {                 "id": 26894,                 "inspectTaskCode": "XJ2024073100012",                 "inspectTaskAreaCode": "a813a748034e4017aa5af36d53931d01",                 "inspectTaskCategoryCode": "f4354fe00cfe414489f7a4c21e8283bd",                 "inspectCategoryShowName": "驻店消防检查/灭火器有效期/管家",                 "result": 20,                 "resultVal": "不合格",                 "resultRemark": "已驳回",                 "resultFiles": None,                 "resultFilesUri": None,                 "handleType": 10,                 "handleTime": None,                 "handlePersonCode": None,                 "handlePersonName": None,                 "inspectCategoryCode": "XJ004712",                 "hasAbnormal": True,                 "modifyBy": None,                 "modifyAt": None,                 "checkStatus": 10,                 "title": "消防栓",                 "abnormalType": 30,                 "score": None,                 "actualScore": None,                 "scoreLabel": 10,                 "scoreLabelVal": "无",                 "resultType": 20,                 "contentExplain": None,                 "originalInspectCategoryShowName": "驻店消防检查/灭火器有效期/管家/灭火器",                 "resultTypes": [                     {                         "resultType": 10,                         "resultTypeName": "合格"                     },                     {                         "resultType": 20,                         "resultTypeName": "不合格"                     },                     {                         "resultType": 40,                         "resultTypeName": "不适用"                     }                 ],                 "complete": True             }         ],         "resultFiles": [],         "taskAreaId": 4638,         "taskId": 2705     }      submit_Value = requests.post(url=url,                                  headers=header,                                  json=json)     print(submit_Value.json())  # 创建要请求的URL列表 urls = ["http://*******/api/v1.0/task/dailySubmit", "http://*******/api/v1.0/task/dailySubmit", "http://*******/api/v1.0/task/dailySubmit"]  # 创建线程列表 threads = []  # 创建并启动线程 for url in urls:     thread = threading.Thread(target=submit_xun, args=(url,))     thread.start()     # threads.append(thread)  # 等待所有线程执行完毕 for thread in threads:     thread.join() 

方法二

import threading import requests   def login_test(url):     data = {         "contract_person_code": "240729000003",         "energy_details": [             {                 "account_subject_code": "10503",                 "account_subject_name": "热水费",                 "initial_readings": 0,                 "end_readings": 0,                 "is_auto": 0,                 "uuid": "e68b9d79-d874-43ea-ba27-49cec19e849f",                 "room_power_type_code": 2,                 "room_power_type_name": "手动抄表",                 "balance_price": 0,                 "photo": None,                 "energy_record_type": 2,                 "price": None,                 "amount_balance": None,                 "sum_consume_price": 0,                 "share_coefficient": 1,                 "consume_unit_price": 10,                 "is_need_photo": False             },             {                 "account_subject_code": "10501",                 "account_subject_name": "电费",                 "initial_readings": 0,                 "end_readings": 0,                 "is_auto": 0,                 "uuid": "666ae3a6-8758-4681-bc0e-6a7a37e8d646",                 "room_power_type_code": 2,                 "room_power_type_name": "手动抄表",                 "balance_price": 0,                 "photo": None,                 "energy_record_type": 2,                 "price": None,                 "amount_balance": None,                 "sum_consume_price": 0,                 "share_coefficient": 1.1,                 "consume_unit_price": 1.25,                 "is_need_photo": False             },             {                 "account_subject_code": "10502",                 "account_subject_name": "冷水费",                 "initial_readings": 0,                 "end_readings": 0,                 "is_auto": 0,                 "uuid": "b4c9ff95-e39b-4ea1-a68f-d185a81447c3",                 "room_power_type_code": 2,                 "room_power_type_name": "手动抄表",                 "balance_price": 0,                 "photo": None,                 "energy_record_type": 2,                 "price": None,                 "amount_balance": None,                 "sum_consume_price": 0,                 "share_coefficient": 1,                 "consume_unit_price": 9.5,                 "is_need_photo": False             },             {                 "account_subject_code": "10508",                 "account_subject_name": "燃气费",                 "initial_readings": 0,                 "end_readings": 0,                 "is_auto": 0,                 "uuid": "04f2a1a1-ca66-4bb0-922a-eb46f4268c6e",                 "room_power_type_code": 2,                 "room_power_type_name": "手动抄表",                 "balance_price": 0,                 "photo": None,                 "energy_record_type": 2,                 "price": None,                 "amount_balance": None,                 "sum_consume_price": 0,                 "share_coefficient": 1,                 "consume_unit_price": 10,                 "is_need_photo": False             }         ],         "reason_type": 1,         "reason": "周边环境",         "reason_code": "2",         "files": [],         "goods_details": [],         "new_store_code": "00182",         "change_coupon_detail_entities": [          ]     }     # url = "http://***/api/v4.0/ContractChange/SaveStoreChangePerson"      headers = {         "token":"Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhdWQiOiJtb2ZhbmciLCJpc3MiOiJzc28iLCJpZCI6IjAyOD"     }     res = requests.post(url=url, json=data, headers=headers)     print(res.json())      # res_obj = res.json()["data"]["token"]     # return res_obj     # print(res_obj)   # 创建要请求的URL列表 urls = ["http://****/api/v4.0/ContractChange/SaveStoreChangePerson",         "http://****/api/v4.0/ContractChange/SaveStoreChangePerson",         "http://****/api/v4.0/ContractChange/SaveStoreChangePerson"]  # 创建线程列表 threads = []  # 创建并启动线程 for url in urls:     thread = threading.Thread(target=login_test, args=(url,))     thread.start()     # threads.append(thread)  # 等待所有线程执行完毕 for thread in threads:     thread.join()  # 等待所有线程执行完毕 # for thread in threads:

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!