使用PythonOperator在Airflow中调用API接口
发布时间:2024-01-04 09:18:54
在Airflow中使用PythonOperator调用API接口,可以使用Python中的requests库进行API的调用。下面是一个使用PythonOperator调用API接口的示例:
from datetime import datetime
import requests
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
def call_api():
api_url = 'https://api.example.com/data'
response = requests.get(api_url)
if response.status_code == 200:
data = response.json()
# 处理返回的数据
process_data(data)
else:
print(f'API请求失败,状态码:{response.status_code}')
def process_data(data):
# 对返回的数据进行处理,这里只是简单打印出来
for item in data:
print(item)
default_args = {
'owner': 'airflow',
'start_date': datetime(2021, 1, 1)
}
dag = DAG(
'call_api_dag',
default_args=default_args,
schedule_interval='@daily',
catchup=False
)
call_api_task = PythonOperator(
task_id='call_api_task',
python_callable=call_api,
dag=dag
)
call_api_task
在上述例子中,我们定义了一个名为call_api的Python函数,该函数使用requests库向API接口发送GET请求。如果请求成功,会使用response.json()获取返回的JSON数据,然后调用process_data函数处理数据。如果请求失败,会打印错误信息。
然后,我们定义了一个名为process_data的Python函数,该函数对返回的数据进行处理,这里仅仅是简单地打印出来。
接下来,我们创建了一个DAG实例,命名为call_api_dag。DAG中的任务将在每天运行一次,使用PythonOperator创建了一个名为call_api_task的任务,该任务会调用call_api函数。
最后,我们将call_api_task添加到DAG中。
当DAG运行时,call_api_task任务将被执行,从而调用API接口并处理返回的数据。
需要注意的是,上述的例子只是简单地示范了在Airflow中使用PythonOperator调用API接口的过程,并处理了一些简单的返回数据。在实际应用中,通常还需要对API的调用结果进行进一步处理和错误处理,以及将结果保存到数据库或其他存储介质中。
