Combining Overland Flow and Simple Crop

Running Overland Flow on its Own

The overland flow model routes water over a landscape and determines how much water infiltrates particular cells in response to rain events and elevation data. Rainfall in this model is assumed to occur instantaneously and rainfall events are assumed to be independent (so a large rainfall event on a previous day has no bearing on the present day).

In order to run the model we first need to import some source and sink types as well as classes for calling off to CLI models and building requests.

import itertools
import os.path
import netCDF4

from meillionen.interface.resource import Feather, NetCDF, OtherFile, Parquet
from meillionen.interface.schema import PandasHandler, NetCDFHandler
from meillionen.settings import Settings, Partitioning
from meillionen.client import Client, CLIRef
from prefect import task, Flow

BASE_DIR = '../../examples/crop-pipeline'
INPUT_DIR = os.path.join(BASE_DIR, 'workflows/inputs')
OUTPUT_DIR = os.path.join(BASE_DIR, 'workflows/outputs')

# using models as python models
# for binder compatibility only (since it does not support poetry yet)
OVERLANDFLOW = os.path.join(BASE_DIR, 'overlandflow/overlandflow_omf/cli.py')
SIMPLECROP = os.path.join(BASE_DIR, 'simplecrop/simplecrop_omf/cli.py')
ov_path = os.path.realpath(os.path.join(BASE_DIR, 'overlandflow'))
sc_path = os.path.realpath(os.path.join(BASE_DIR, 'simplecrop'))
os.environ['PYTHONPATH'] = f'{ov_path}:{sc_path}'

# using models as python packages
# need to poetry install simplecrop and overlandflow packages first
# OVERLANDFLOW = "overlandflow-omf"
# SIMPLECROP = "simplecrop-omf"

settings = Settings(
    base_path=OUTPUT_DIR
)

and build a request to call our model with.

overlandflow = Client(CLIRef(OVERLANDFLOW), settings=settings)
overlandflow
<meillionen.client.Client at 0x7f3e703ec5e0>

Then call the overland flow model with our request (which will create files on the file system)

overland_payloads = overlandflow.run(
    class_name='overlandflow',
    method_name='run',
    resource_payloads={
        'elevation': OtherFile(os.path.join(INPUT_DIR, 'elevation.asc')),
        'weather': Feather(os.path.join(INPUT_DIR, 'weather.feather')),
        'soil_water_infiltration__depth': NetCDF.partial('swid')
    }
)

Running Simple Crop on its Own

simplecrop is a model of yearly crop growth that operates on the command line. When called in the current directory with no arguments it expects that there is a data folder. It expects that the data folder contains five files

  • irrig.inp - daily irrigation

  • plant.inp - plant growth parameters for the simulation

  • simctrl.inp - simulation reporting parameters

  • soil.inp - soil characteristic parameters for the simulation

  • weather.inp - daily weather data (with variables like maximum temperature, solar energy flux)

simplecrop also expects there to be an output folder. After the model has run it will populate the output folder with three files

  • plant.out- daily plant characteristics

  • soil.out - daily soil characteristics

  • wbal.out - summary soil and plant statistics about simulation

In order to wrap this model in an interface that allows you to run the model without manually building those input files and manually converting the output files into a format conducive to analysis we need to have a construct the input files and parse the output files. Fortunately we have such a model wrapper already.

import pandas as pd

simple_crop = Client(CLIRef(SIMPLECROP), settings=settings)
simple_crop_response = simple_crop.run(
    class_name='simplecrop',
    method_name='run',
    resource_payloads={
        'daily': Feather(os.path.join(BASE_DIR, 'simplecrop/data/daily.feather')),
        'yearly': Feather(os.path.join(BASE_DIR, 'simplecrop/data/yearly.feather')),
        'plant': Feather.partial(),
        'soil': Feather.partial(),
        'raw': OtherFile.partial(ext='')
    })
soil_df = simple_crop_response.load('soil')
soil_df
day_of_year soil_daily_runoff soil_daily_infiltration soil_daily_drainage soil_evapotranspiration soil_evaporation plant_potential_transpiration soil_water_storage_depth soil_water_profile_ratio soil_water_deficit_stress soil_water_excess_stress
0 3 0.0 0.0 1.86 2.25 2.23 0.02 260.97 1.800 1.000 1.0
1 6 0.0 0.0 2.25 2.64 2.62 0.02 264.09 1.821 1.000 1.0
2 9 0.0 0.0 0.94 1.32 1.31 0.01 253.64 1.749 1.000 1.0
3 12 0.0 0.0 0.57 2.56 2.53 0.02 249.09 1.718 1.000 1.0
4 15 0.0 0.0 0.00 1.94 1.89 0.02 241.54 1.666 1.000 1.0
... ... ... ... ... ... ... ... ... ... ... ...
94 282 0.0 0.0 0.00 4.04 0.61 1.55 154.76 1.067 0.566 1.0
95 285 0.0 0.0 0.00 1.64 0.26 0.56 152.05 1.049 0.544 1.0
96 288 0.0 0.0 0.00 2.92 0.46 0.88 146.65 1.011 0.499 1.0
97 291 0.0 0.0 0.00 4.84 0.75 1.25 140.73 0.971 0.449 1.0
98 293 0.0 0.0 0.00 4.18 0.65 0.95 137.47 0.948 0.422 1.0

99 rows × 11 columns

Using Prefect to Feed Overland Flow Results into Simple Crop

Now we’ll combine overlandflow with simplecrop. This will require an adapter to augment the daily source data fed into simplecrop with the depth of water that has infiltrated that cell. The workflow is shown below without the details of calling simplecrop for each x, y coordinate in the map.

workflow

import pyarrow as pa
import pandas as pd
import pathlib
from meillionen.interface.base import MethodRequestArg

trial = settings.trial("simplecrop-parallelism")

overlandflow = Client(CLIRef(OVERLANDFLOW), settings=trial)

simplecrop_partitioning = Partitioning(
    pa.schema([("x", pa.int32()), ("y", pa.int32())]))

simple_crop = Client(CLIRef(SIMPLECROP), settings=trial)
daily_df = pd.read_feather(os.path.join(BASE_DIR, 'simplecrop/data/daily.feather'))
yearly = Feather(os.path.join(BASE_DIR, 'simplecrop/data/yearly.feather'))
        
@task()
def run_overland_flow():
    partial_resource_payloads = {
        'elevation': OtherFile(os.path.join(INPUT_DIR, 'elevation.asc')),
        'weather': Feather(os.path.join(INPUT_DIR, 'weather.feather')),
        'soil_water_infiltration__depth': NetCDF.partial(variable='swid')
    }

    response = overlandflow.run(
        class_name='overlandflow',
        method_name='run',
        resource_payloads=partial_resource_payloads
    )

    return response.load('soil_water_infiltration__depth')


@task()
def chunkify_soil_water_infiltration_depth(swid):
    return [{'soil_water_infiltration__depth': swid[x, y, :], 'x': x, 'y': y}
            for (x,y) in itertools.product(range(10,13), range(21,24))]


@task()
def simplecrop_process_chunk(data):
    soil_water_infiltration__depth = data['soil_water_infiltration__depth']
    x, y = data['x'], data['y']
    partition = simplecrop_partitioning.complete(x=x, y=y)
    daily_xy_df = daily_df.assign(rainfall=soil_water_infiltration__depth)
    daily = simple_crop.save(
        mra=MethodRequestArg(class_name='simplecrop', method_name='run', arg_name='daily'),
        resource=Feather.partial(),
        data=daily_xy_df,
        partition=partition)
    
    resource_payloads = {
        'daily': daily,
        'yearly': yearly,
        'plant': Parquet.partial(),
        'soil': Parquet.partial(),
        'raw': OtherFile.partial(ext='')
    }

    response = simple_crop.run(
        class_name='simplecrop',
        method_name='run',
        resource_payloads=resource_payloads,
        partition=partition)

with Flow('crop_pipeline') as flow:
    overland_flow = run_overland_flow()
    surface_water_depth_chunks = chunkify_soil_water_infiltration_depth(overland_flow)
    yield_chunks = simplecrop_process_chunk.map(surface_water_depth_chunks)

flow.run()
[2021-10-08 14:53:22-0700] INFO - prefect.FlowRunner | Beginning Flow run for 'crop_pipeline'
[2021-10-08 14:53:22-0700] INFO - prefect.TaskRunner | Task 'run_overland_flow': Starting task run...
[2021-10-08 14:53:27-0700] INFO - prefect.TaskRunner | Task 'run_overland_flow': Finished task run for task with final state: 'Success'
[2021-10-08 14:53:27-0700] INFO - prefect.TaskRunner | Task 'chunkify_soil_water_infiltration_depth': Starting task run...
[2021-10-08 14:53:27-0700] INFO - prefect.TaskRunner | Task 'chunkify_soil_water_infiltration_depth': Finished task run for task with final state: 'Success'
[2021-10-08 14:53:27-0700] INFO - prefect.TaskRunner | Task 'simplecrop_process_chunk': Starting task run...
[2021-10-08 14:53:27-0700] INFO - prefect.TaskRunner | Task 'simplecrop_process_chunk': Finished task run for task with final state: 'Mapped'
[2021-10-08 14:53:27-0700] INFO - prefect.TaskRunner | Task 'simplecrop_process_chunk[0]': Starting task run...
[2021-10-08 14:53:28-0700] INFO - prefect.TaskRunner | Task 'simplecrop_process_chunk[0]': Finished task run for task with final state: 'Success'
[2021-10-08 14:53:28-0700] INFO - prefect.TaskRunner | Task 'simplecrop_process_chunk[1]': Starting task run...
[2021-10-08 14:53:30-0700] INFO - prefect.TaskRunner | Task 'simplecrop_process_chunk[1]': Finished task run for task with final state: 'Success'
[2021-10-08 14:53:30-0700] INFO - prefect.TaskRunner | Task 'simplecrop_process_chunk[2]': Starting task run...
[2021-10-08 14:53:31-0700] INFO - prefect.TaskRunner | Task 'simplecrop_process_chunk[2]': Finished task run for task with final state: 'Success'
[2021-10-08 14:53:31-0700] INFO - prefect.TaskRunner | Task 'simplecrop_process_chunk[3]': Starting task run...
[2021-10-08 14:53:33-0700] INFO - prefect.TaskRunner | Task 'simplecrop_process_chunk[3]': Finished task run for task with final state: 'Success'
[2021-10-08 14:53:33-0700] INFO - prefect.TaskRunner | Task 'simplecrop_process_chunk[4]': Starting task run...
[2021-10-08 14:53:34-0700] INFO - prefect.TaskRunner | Task 'simplecrop_process_chunk[4]': Finished task run for task with final state: 'Success'
[2021-10-08 14:53:34-0700] INFO - prefect.TaskRunner | Task 'simplecrop_process_chunk[5]': Starting task run...
[2021-10-08 14:53:36-0700] INFO - prefect.TaskRunner | Task 'simplecrop_process_chunk[5]': Finished task run for task with final state: 'Success'
[2021-10-08 14:53:36-0700] INFO - prefect.TaskRunner | Task 'simplecrop_process_chunk[6]': Starting task run...
[2021-10-08 14:53:37-0700] INFO - prefect.TaskRunner | Task 'simplecrop_process_chunk[6]': Finished task run for task with final state: 'Success'
[2021-10-08 14:53:37-0700] INFO - prefect.TaskRunner | Task 'simplecrop_process_chunk[7]': Starting task run...
[2021-10-08 14:53:39-0700] INFO - prefect.TaskRunner | Task 'simplecrop_process_chunk[7]': Finished task run for task with final state: 'Success'
[2021-10-08 14:53:39-0700] INFO - prefect.TaskRunner | Task 'simplecrop_process_chunk[8]': Starting task run...
[2021-10-08 14:53:40-0700] INFO - prefect.TaskRunner | Task 'simplecrop_process_chunk[8]': Finished task run for task with final state: 'Success'
[2021-10-08 14:53:40-0700] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
<Success: "All reference tasks succeeded.">
from pyarrow.dataset import dataset

plant_df = dataset(
    os.path.join(OUTPUT_DIR, 'simplecrop-parallelism/simplecrop/run/plant'),
    partitioning=simplecrop_partitioning.to_arrow()).to_table().to_pandas()
plant_df
day_of_year plant_leaf_count air_accumulated_temp plant_matter plant_matter_canopy plant_matter_fruit plant_matter_root plant_leaf_area_index x y
0 121 2.00 0.00 0.30 0.25 0.05 0.00 0.01 10 21
1 123 2.20 0.00 0.65 0.55 0.10 0.00 0.02 10 21
2 126 2.50 0.00 1.20 1.02 0.18 0.00 0.03 10 21
3 129 2.79 0.00 1.91 1.62 0.29 0.00 0.04 10 21
4 132 3.09 0.00 2.80 2.38 0.42 0.00 0.06 10 21
... ... ... ... ... ... ... ... ... ... ...
526 282 12.06 186.05 1130.70 465.39 82.13 583.19 1.49 12 23
527 285 12.06 223.00 1138.88 465.39 82.13 591.36 1.34 12 23
528 288 12.06 243.50 1154.41 465.39 82.13 606.90 1.25 12 23
529 291 12.06 277.65 1170.74 465.39 82.13 623.22 1.11 12 23
530 293 12.06 304.90 1178.78 465.39 82.13 631.27 0.99 12 23

531 rows × 10 columns