如何在Airflow中编写执行MySQL查询的任务

avatar
作者
猴君
阅读量:0

在Airflow中编写执行MySQL查询的任务可以通过使用PythonOperator来执行查询的Python函数。以下是一个简单的示例:

from airflow import DAG from airflow.operators.python_operator import PythonOperator from airflow.operators.mysql_operator import MySqlOperator from datetime import datetime  default_args = {     'owner': 'airflow',     'depends_on_past': False,     'start_date': datetime(2021, 1, 1),     'email_on_failure': False,     'email_on_retry': False,     'retries': 1 }  dag = DAG('mysql_query_dag', default_args=default_args, schedule_interval='@daily')  def execute_mysql_query():     # 连接MySQL数据库     conn = MySQLdb.connect(host="localhost", user="root", passwd="password", db="database")     cursor = conn.cursor()          # 执行查询     cursor.execute("SELECT * FROM table")          # 获取结果     rows = cursor.fetchall()     for row in rows:         print(row)          # 关闭连接     conn.close()  mysql_task = PythonOperator(     task_id='execute_mysql_query',     python_callable=execute_mysql_query,     dag=dag )  mysql_task 

在这个例子中,首先创建了一个DAG,并定义了一个Python函数execute_mysql_query,该函数连接到MySQL数据库,执行查询并打印结果。然后使用PythonOperator来执行这个函数,并将其添加到DAG中。当DAG运行时,该任务将连接到MySQL数据库执行查询。

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!