python实现把其他sql server数据库的某些表的前一天数据定时存储到自己数据库同名的表中

avatar
作者
猴君
阅读量:1
import schedule import time import pyodbc import pandas as pd from datetime import datetime, timedelta from sqlalchemy import create_engine, text import warnings import logging  # 配置数据库连接 source_databases = [     {         'database_name': '',         'server': '',         'database': '',         'username': '',         'password': '',         'branch_id': 0  # 分店ID     }, ]  # 目标数据库配置 target_database = {     'database_name': '',     'server': '',     'database': '',     'username': '',     'password': '' }  # 要处理的表及其唯一标识字段和日期字段 tables = {     'cmis_patientinfo': {'unique_field': '唯一标识', 'date_field': '日期字段', 'fendian_field': '分店ID'},     'cmis_yuyue': {'unique_field': '唯一标识', 'date_field': '日期字段', 'fendian_field': '分店ID'}, }   # 连接数据库 def connect_to_db(config):     connection_string = f"DRIVER={{ODBC Driver 17 for SQL Server}};SERVER={config['server']};DATABASE={config['database']};UID={config['username']};PWD={config['password']}"     return pyodbc.connect(connection_string)   # 处理数据 def process_data(df, branch_id, fendian_field):     df[fendian_field] = branch_id  # 动态更新分店ID     return df   # 获取前一天的数据 def get_yesterday_data(connection, table, date_field):     # 获取昨天的日期和时间(0点)     yesterday_start = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) - timedelta(days=1)     # 获取昨天的日期和时间(23:59:59)     yesterday_end = yesterday_start + timedelta(hours=23, minutes=59, seconds=59)      # 查询前一天的数据以包含 upload 不为 5 或为 NULL 的条件     query = f"SELECT * FROM {table} WHERE {date_field} BETWEEN ? AND ? AND (upload != 5 OR upload IS NULL)"      # 忽略pandas发出的特定UserWarning     warnings.filterwarnings('ignore', category=UserWarning,                             message="pandas only supports SQLAlchemy connectable")      return pd.read_sql(query, connection, params=[yesterday_start, yesterday_end])   # 获取目标表的列名 def get_target_columns(connection, table):     cursor = connection.cursor()     cursor.execute(f"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '{table}'")     columns = [row.COLUMN_NAME for row in cursor.fetchall()]     cursor.close()     return columns   # 数据类型转换函数 def convert_data_types(row):     new_row = []     for value in row:         if pd.isnull(value):             new_row.append(None)         elif isinstance(value, pd.Timestamp):             new_row.append(value.to_pydatetime())         else:             new_row.append(value)     return tuple(new_row)   # 插入数据到目标数据库并更新upload字段 def insert_data_to_target(source_connection, target_connection, table, unique_field, df, db_config):     print(f"{time.ctime()} —— {db_config['database_name']}:表-{table}: 数据插入中...")     source_cursor = source_connection.cursor()     target_cursor = target_connection.cursor()      target_columns = get_target_columns(target_connection, table)     df_columns = df.columns.tolist()      # 过滤出目标表存在的列     common_columns = [col for col in df_columns if col in target_columns]      success_count = 0     failure_count = 0     error = ''      for index, row in df.iterrows():         columns = ', '.join(common_columns)         placeholders = ', '.join(['?' for _ in common_columns])         values = convert_data_types(row[common_columns])          insert_query = f"INSERT INTO {table} ({columns}) VALUES ({placeholders})"          try:             target_cursor.execute(insert_query, values)             target_connection.commit()              unique_value = row[unique_field]             # 更新源数据库中 upload 字段为 5             update_source_query = f"UPDATE {table} SET upload = 5 WHERE {unique_field} = ?"             source_cursor.execute(update_source_query, unique_value)             source_connection.commit()             # 更新目标数据库中 upload 字段为 5             update_target_query = f"UPDATE {table} SET upload = 5 WHERE {unique_field} = ?"             target_cursor.execute(update_target_query, unique_value)             target_connection.commit()              success_count += 1         except Exception as e:             failure_count += 1             error = e             target_connection.rollback()             source_connection.rollback()      source_cursor.close()     target_cursor.close()      message = f"{time.ctime()} —— {db_config['database_name']}:表-{table}: 插入成功 {success_count} 条, 插入失败 {failure_count} 条"      if failure_count > 0:         message += f", 失败原因: {error}"      print(message)     # print(f"{time.ctime()} —— {db_config['database_name']}:表-{table}: 插入成功 {success_count} 条, 插入失败 {failure_count} 条")   # 主任务 def main_task():     source_conn = None     target_conn = None     for db_config in source_databases:         try:             # 连接数据库             source_conn = connect_to_db(db_config)             print(f"{time.ctime()} —— {db_config['database_name']}-数据库连接成功!")             target_conn = connect_to_db(target_database)             print(f"{time.ctime()} —— {target_database['database_name']}-数据库连接成功!")              for table, fields in tables.items():                 unique_field = fields['unique_field']                 date_field = fields['date_field']                 fendian_field = fields['fendian_field']                 try:                     df = get_yesterday_data(source_conn, table, date_field)                     if not df.empty:                         processed_df = process_data(df, db_config['branch_id'], fendian_field)                         insert_data_to_target(source_conn, target_conn, table, unique_field, processed_df, db_config)                         # print(f"{time.ctime()} —— {db_config['database_name']}:表-{table}: 数据处理成功!")                     else:                         print(f"{time.ctime()} —— {db_config['database_name']}:表-{table}: 暂无待处理的昨日数据!")                 except Exception as e:                     print(f"{time.ctime()} —— {db_config['database_name']}:表-{table}:处理数据失败!error: {e}")         except Exception as e:             # print(f"{time.ctime()} —— {db_config['database_name']}-数据库连接失败! error: {e}")             logging.error(f"{time.ctime()} —— {db_config['database_name']}-数据库连接失败! error: {e}")             continue         finally:             try:                 if source_conn is not None:                     source_conn.close()                     print(f"{time.ctime()} —— 关闭数据库连接 {db_config['database_name']}")             except Exception as e:                 logging.error(f"{time.ctime()} —— 关闭 {db_config['database_name']}-数据库连接时出错: {e}")              try:                 if target_conn is not None:                     target_conn.close()                     print(f"{time.ctime()} —— 关闭数据库连接 {target_database['database_name']}")             except Exception as e:                 logging.error(f"{time.ctime()} —— 关闭 {target_database['database_name']}-数据库连接时出错: {e}")  # 定时任务 schedule.every().day.at("03:00").do(main_task)  while True:     schedule.run_pending()     time.sleep(40)  

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!