阅读量:1
import schedule import time import pyodbc import pandas as pd from datetime import datetime, timedelta from sqlalchemy import create_engine, text import warnings import logging # 配置数据库连接 source_databases = [ { 'database_name': '', 'server': '', 'database': '', 'username': '', 'password': '', 'branch_id': 0 # 分店ID }, ] # 目标数据库配置 target_database = { 'database_name': '', 'server': '', 'database': '', 'username': '', 'password': '' } # 要处理的表及其唯一标识字段和日期字段 tables = { 'cmis_patientinfo': {'unique_field': '唯一标识', 'date_field': '日期字段', 'fendian_field': '分店ID'}, 'cmis_yuyue': {'unique_field': '唯一标识', 'date_field': '日期字段', 'fendian_field': '分店ID'}, } # 连接数据库 def connect_to_db(config): connection_string = f"DRIVER={{ODBC Driver 17 for SQL Server}};SERVER={config['server']};DATABASE={config['database']};UID={config['username']};PWD={config['password']}" return pyodbc.connect(connection_string) # 处理数据 def process_data(df, branch_id, fendian_field): df[fendian_field] = branch_id # 动态更新分店ID return df # 获取前一天的数据 def get_yesterday_data(connection, table, date_field): # 获取昨天的日期和时间(0点) yesterday_start = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) - timedelta(days=1) # 获取昨天的日期和时间(23:59:59) yesterday_end = yesterday_start + timedelta(hours=23, minutes=59, seconds=59) # 查询前一天的数据以包含 upload 不为 5 或为 NULL 的条件 query = f"SELECT * FROM {table} WHERE {date_field} BETWEEN ? AND ? AND (upload != 5 OR upload IS NULL)" # 忽略pandas发出的特定UserWarning warnings.filterwarnings('ignore', category=UserWarning, message="pandas only supports SQLAlchemy connectable") return pd.read_sql(query, connection, params=[yesterday_start, yesterday_end]) # 获取目标表的列名 def get_target_columns(connection, table): cursor = connection.cursor() cursor.execute(f"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '{table}'") columns = [row.COLUMN_NAME for row in cursor.fetchall()] cursor.close() return columns # 数据类型转换函数 def convert_data_types(row): new_row = [] for value in row: if pd.isnull(value): new_row.append(None) elif isinstance(value, pd.Timestamp): new_row.append(value.to_pydatetime()) else: new_row.append(value) return tuple(new_row) # 插入数据到目标数据库并更新upload字段 def insert_data_to_target(source_connection, target_connection, table, unique_field, df, db_config): print(f"{time.ctime()} —— {db_config['database_name']}:表-{table}: 数据插入中...") source_cursor = source_connection.cursor() target_cursor = target_connection.cursor() target_columns = get_target_columns(target_connection, table) df_columns = df.columns.tolist() # 过滤出目标表存在的列 common_columns = [col for col in df_columns if col in target_columns] success_count = 0 failure_count = 0 error = '' for index, row in df.iterrows(): columns = ', '.join(common_columns) placeholders = ', '.join(['?' for _ in common_columns]) values = convert_data_types(row[common_columns]) insert_query = f"INSERT INTO {table} ({columns}) VALUES ({placeholders})" try: target_cursor.execute(insert_query, values) target_connection.commit() unique_value = row[unique_field] # 更新源数据库中 upload 字段为 5 update_source_query = f"UPDATE {table} SET upload = 5 WHERE {unique_field} = ?" source_cursor.execute(update_source_query, unique_value) source_connection.commit() # 更新目标数据库中 upload 字段为 5 update_target_query = f"UPDATE {table} SET upload = 5 WHERE {unique_field} = ?" target_cursor.execute(update_target_query, unique_value) target_connection.commit() success_count += 1 except Exception as e: failure_count += 1 error = e target_connection.rollback() source_connection.rollback() source_cursor.close() target_cursor.close() message = f"{time.ctime()} —— {db_config['database_name']}:表-{table}: 插入成功 {success_count} 条, 插入失败 {failure_count} 条" if failure_count > 0: message += f", 失败原因: {error}" print(message) # print(f"{time.ctime()} —— {db_config['database_name']}:表-{table}: 插入成功 {success_count} 条, 插入失败 {failure_count} 条") # 主任务 def main_task(): source_conn = None target_conn = None for db_config in source_databases: try: # 连接数据库 source_conn = connect_to_db(db_config) print(f"{time.ctime()} —— {db_config['database_name']}-数据库连接成功!") target_conn = connect_to_db(target_database) print(f"{time.ctime()} —— {target_database['database_name']}-数据库连接成功!") for table, fields in tables.items(): unique_field = fields['unique_field'] date_field = fields['date_field'] fendian_field = fields['fendian_field'] try: df = get_yesterday_data(source_conn, table, date_field) if not df.empty: processed_df = process_data(df, db_config['branch_id'], fendian_field) insert_data_to_target(source_conn, target_conn, table, unique_field, processed_df, db_config) # print(f"{time.ctime()} —— {db_config['database_name']}:表-{table}: 数据处理成功!") else: print(f"{time.ctime()} —— {db_config['database_name']}:表-{table}: 暂无待处理的昨日数据!") except Exception as e: print(f"{time.ctime()} —— {db_config['database_name']}:表-{table}:处理数据失败!error: {e}") except Exception as e: # print(f"{time.ctime()} —— {db_config['database_name']}-数据库连接失败! error: {e}") logging.error(f"{time.ctime()} —— {db_config['database_name']}-数据库连接失败! error: {e}") continue finally: try: if source_conn is not None: source_conn.close() print(f"{time.ctime()} —— 关闭数据库连接 {db_config['database_name']}") except Exception as e: logging.error(f"{time.ctime()} —— 关闭 {db_config['database_name']}-数据库连接时出错: {e}") try: if target_conn is not None: target_conn.close() print(f"{time.ctime()} —— 关闭数据库连接 {target_database['database_name']}") except Exception as e: logging.error(f"{time.ctime()} —— 关闭 {target_database['database_name']}-数据库连接时出错: {e}") # 定时任务 schedule.every().day.at("03:00").do(main_task) while True: schedule.run_pending() time.sleep(40)