# couler **Repository Path**: easy_and_easy/couler ## Basic Information - **Project Name**: couler - **Description**: Unified Interface for Constructing and Managing Workflows - **Primary Language**: Unknown - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2021-01-27 - **Last Updated**: 2021-01-27 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README [](https://github.com/couler-proj/couler/actions?query=event%3Apush+branch%3Amaster) # Couler ## What is Couler? Couler aims to provide a unified interface for constructing and managing workflows on different workflow engines, such as [Argo Workflows](https://github.com/argoproj/argo), [Tekton Pipelines](https://tekton.dev/), and [Apache Airflow](https://airflow.apache.org/). ## Why use Couler? Many workflow engines exist nowadays, e.g. [Argo Workflows](https://github.com/argoproj/argo), [Tekton Pipelines](https://tekton.dev/), and [Apache Airflow](https://airflow.apache.org/). However, their programming experience varies and they have different level of abstractions that are often obscure and complex. The code snippets below are some examples for constructing workflows using Apache Airflow and [Kubeflow Pipelines](https://github.com/kubeflow/pipelines/).
| Apache Airflow | Kubeflow Pipelines |
|---|---|
```python def create_dag(dag_id, schedule, dag_number, default_args): def hello_world_py(*args): print('Hello World') dag = DAG(dag_id, schedule_interval=schedule, default_args=default_args) with dag: t1 = PythonOperator( task_id='hello_world', python_callable=hello_world_py, dag_number=dag_number) return dag for n in range(1, 10): default_args = {'owner': 'airflow', 'start_date': datetime(2018, 1, 1) } globals()[dag_id] = create_dag( 'hello_world_{}'.format(str(n)), '@daily', n, default_args) ``` |
```python class FlipCoinOp(dsl.ContainerOp): """Flip a coin and output heads or tails randomly.""" def __init__(self): super(FlipCoinOp, self).__init__( name='Flip', image='python:alpine3.6', command=['sh', '-c'], arguments=['python -c "import random; result = \'heads\' if random.randint(0,1) == 0 ' 'else \'tails\'; print(result)" | tee /tmp/output'], file_outputs={'output': '/tmp/output'}) class PrintOp(dsl.ContainerOp): """Print a message.""" def __init__(self, msg): super(PrintOp, self).__init__( name='Print', image='alpine:3.6', command=['echo', msg], ) # define the recursive operation @graph_component def flip_component(flip_result): print_flip = PrintOp(flip_result) flipA = FlipCoinOp().after(print_flip) with dsl.Condition(flipA.output == 'heads'): flip_component(flipA.output) @dsl.pipeline( name='pipeline flip coin', description='shows how to use graph_component.' ) def recursive(): flipA = FlipCoinOp() flipB = FlipCoinOp() flip_loop = flip_component(flipA.output) flip_loop.after(flipB) PrintOp('cool, it is over. %s' % flipA.output).after(flip_loop) ``` |