# 使用操作器
操作器代表一个理想情况下是幂等的任务。 操作员确定DAG运行时实际执行的内容。
有关更多信息,请参阅[Operators Concepts](https://apachecn.github.io/airflow-doc-zh/concepts.html)文档和[Operators API Reference](https://apachecn.github.io/airflow-doc-zh/code.html) 。
* [BashOperator](9)
* [模板](9)
* [故障排除](9)
* [找不到Jinja模板](9)
* [PythonOperator](9)
* [传递参数](9)
* [模板](9)
* [Google云端平台运营商](9)
* [GoogleCloudStorageToBigQueryOperator](9)
## [BashOperator](9)
使用[`BashOperator`](https://apachecn.github.io/airflow-doc-zh/code.html "airflow.operators.bash_operator.BashOperator")在[Bash](https://www.gnu.org/software/bash/) shell中执行命令。
```
run_this = BashOperator (
task_id = 'run_after_loop' , bash_command = 'echo 1' , dag = dag )
```
### [模板](9)
您可以使用[Jinja模板](https://apachecn.github.io/airflow-doc-zh/concepts.html)来参数化`bash_command`参数。
```
task = BashOperator (
task_id = 'also_run_this' ,
bash_command = 'echo "run_id={{ run_id }} | dag_run={{ dag_run }}"' ,
dag = dag )
```
### [故障排除](9)
#### [找不到Jinja模板](9)
在使用`bash_command`参数直接调用Bash脚本时,在脚本名称后添加空格。 这是因为Airflow尝试将Jinja模板应用于它,这将失败。
```
t2 = BashOperator (
task_id = 'bash_example' ,
# This fails with `Jinja template not found` error
# bash_command="/home/batcher/test.sh",
# This works (has a space after)
bash_command = "/home/batcher/test.sh " ,
dag = dag )
```
## [PythonOperator](9)
使用[`PythonOperator`](https://apachecn.github.io/airflow-doc-zh/code.html "airflow.operators.python_operator.PythonOperator")执行Python callables。
```
def print_context ( ds , ** kwargs ):
pprint ( kwargs )
print ( ds )
return 'Whatever you return gets printed in the logs'
run_this = PythonOperator (
task_id = 'print_the_context' ,
provide_context = True ,
python_callable = print_context ,
dag = dag )
```
### [传递参数](9)
使用`op_args`和`op_kwargs`参数将其他参数传递给Python可调用对象。
```
def my_sleeping_function ( random_base ):
"""This is a function that will run within the DAG execution"""
time . sleep ( random_base )
# Generate 10 sleeping tasks, sleeping from 0 to 4 seconds respectively
for i in range ( 5 ):
task = PythonOperator (
task_id = 'sleep_for_' + str ( i ),
python_callable = my_sleeping_function ,
op_kwargs = { 'random_base' : float ( i ) / 10 },
dag = dag )
task . set_upstream ( run_this )
```
### [模板](9)
当您将`provide_context`参数设置为`True` ,Airflow会传入一组额外的关键字参数:一个用于每个[Jinja模板变量](https://apachecn.github.io/airflow-doc-zh/code.html)和一个`templates_dict`参数。
`templates_dict`参数是模板化的,因此字典中的每个值都被评估为[Jinja模板](https://apachecn.github.io/airflow-doc-zh/concepts.html) 。
## [Google云端平台运营商](9)
### [GoogleCloudStorageToBigQueryOperator](9)
使用[`GoogleCloudStorageToBigQueryOperator`](https://apachecn.github.io/airflow-doc-zh/integration.html "airflow.contrib.operators.gcs_to_bq.GoogleCloudStorageToBigQueryOperator")执行BigQuery加载作业。
```
load_csv = gcs_to_bq . GoogleCloudStorageToBigQueryOperator (
task_id = 'gcs_to_bq_example' ,
bucket = 'cloud-samples-data' ,
source_objects = [ 'bigquery/us-states/us-states.csv' ],
destination_project_dataset_table = 'airflow_test.gcs_to_bq_table' ,
schema_fields = [
{ 'name' : 'name' , 'type' : 'STRING' , 'mode' : 'NULLABLE' },
{ 'name' : 'post_abbr' , 'type' : 'STRING' , 'mode' : 'NULLABLE' },
],
write_disposition = 'WRITE_TRUNCATE' ,
dag = dag )
```