ThinkChat2.0新版上线,更智能更精彩,支持会话、画图、阅读、搜索等,送10W Token,即刻开启你的AI之旅 广告
# 数据血缘(Lineage) > 贡献者:[@morefreeze](https://github.com/morefreeze) > 注意: > Lineage 支持是实验性的,可能随时会发生变化。 Airflow 可以帮助跟踪数据的来源,以及数据发生了什么变化。 这有助于实现审计跟踪和数据治理,还可以调试数据流。 Airflow 通过任务的 inlets 和 outlets 跟踪数据。 让我们通过一个例子看看它是如何工作的。 ```py from airflow.operators.bash_operator import BashOperator from airflow.operators.dummy_operator import DummyOperator from airflow.lineage.datasets import File from airflow.models import DAG from datetime import timedelta FILE_CATEGORIES = ["CAT1", "CAT2", "CAT3"] args = { 'owner': 'airflow' , 'start_date': airflow.utils.dates.days_ago(2) } dag = DAG( dag_id='example_lineage', default_args=args, schedule_interval='0 0 * * *', dagrun_timeout=timedelta(minutes=60)) f_final = File("/tmp/final") run_this_last = DummyOperator(task_id='run_this_last', dag=dag, inlets={"auto": True}, outlets={"datasets": [f_final,]}) f_in = File("/tmp/whole_directory/") outlets = [] for file in FILE_CATEGORIES: f_out = File("/tmp/{}/{{{{ execution_date }}}}".format(file)) outlets.append(f_out) run_this = BashOperator( task_id='run_me_first', bash_command='echo 1', dag=dag, inlets={"datasets": [f_in,]}, outlets={"datasets": outlets} ) run_this.set_downstream(run_this_last) ``` 任务定义了参数`inlets`和`outlets`。 `inlets`可以是一个数据集列表`{"datesets":[dataset1,dataset2]}`,也可以是指定的上游任务`outlets`像这样`{"task_ids":["task_id1","task_id2"]}`,或者不想指定直接用`{"auto":True}`也可以,甚至是前面几种的组合。 `outlets` 也是一个数据集列表`{"datesets":[dataset1,dataset2]}`。 在运行任务时,数据集的字段会被模板渲染。 > 注意: > 只要 Operator 支持,它会自动地加上 inlets 和 outlets。 在示例 DAG 任务中, `run_me_first`是一个 BashOperator,它接收`CAT1`, `CAT2`, `CAT3`作为 inlets(译注:根据代码,应为“输出 outlets”)。 其中的`execution_date`会在任务运行时被渲染成执行时间。 > 注意: > 在底层,Airflow 会在`pre_execute`方法中准备 lineage 元数据。 当任务运行结束时,会调用`post_execute`将 lineage 元数据推送到 XCOM 中。 因此,如果您要创建自己的 Operator,并且需要覆写这些方法,确保分别用`prepare_lineage`和`apply_lineage`装饰这些方法。 ## Apache Atlas Airflow 可以将 lineage 元数据发送到 Apache Atlas。 您需要在`airflow.cfg`中配置`atlas`: ```config [lineage] backend = airflow.lineage.backend.atlas [atlas] username = my_username password = my_password host = host port = 21000 ``` 请确保已经安装了`atlasclient`。