欢迎访问宙启技术站
智能推送

使用PythonOperator在Airflow中调用API接口

发布时间:2024-01-04 09:18:54

在Airflow中使用PythonOperator调用API接口,可以使用Python中的requests库进行API的调用。下面是一个使用PythonOperator调用API接口的示例:

from datetime import datetime
import requests
from airflow import DAG
from airflow.operators.python_operator import PythonOperator

def call_api():
    api_url = 'https://api.example.com/data'
    response = requests.get(api_url)
    if response.status_code == 200:
        data = response.json()
        # 处理返回的数据
        process_data(data)
    else:
        print(f'API请求失败,状态码:{response.status_code}')

def process_data(data):
    # 对返回的数据进行处理,这里只是简单打印出来
    for item in data:
        print(item)

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2021, 1, 1)
}

dag = DAG(
    'call_api_dag',
    default_args=default_args,
    schedule_interval='@daily',
    catchup=False
)

call_api_task = PythonOperator(
    task_id='call_api_task',
    python_callable=call_api,
    dag=dag
)

call_api_task

在上述例子中,我们定义了一个名为call_api的Python函数,该函数使用requests库向API接口发送GET请求。如果请求成功,会使用response.json()获取返回的JSON数据,然后调用process_data函数处理数据。如果请求失败,会打印错误信息。

然后,我们定义了一个名为process_data的Python函数,该函数对返回的数据进行处理,这里仅仅是简单地打印出来。

接下来,我们创建了一个DAG实例,命名为call_api_dag。DAG中的任务将在每天运行一次,使用PythonOperator创建了一个名为call_api_task的任务,该任务会调用call_api函数。

最后,我们将call_api_task添加到DAG中。

当DAG运行时,call_api_task任务将被执行,从而调用API接口并处理返回的数据。

需要注意的是,上述的例子只是简单地示范了在Airflow中使用PythonOperator调用API接口的过程,并处理了一些简单的返回数据。在实际应用中,通常还需要对API的调用结果进行进一步处理和错误处理,以及将结果保存到数据库或其他存储介质中。