Combining Overland Flow and Simple Crop¶
Running Overland Flow on its Own¶
The overland flow model routes water over a landscape and determines how much water infiltrates particular cells in response to rain events and elevation data. Rainfall in this model is assumed to occur instantaneously and rainfall events are assumed to be independent (so a large rainfall event on a previous day has no bearing on the present day).
In order to run the model we first need to import some source and sink types as well as classes for calling off to CLI models and building requests.
import itertools
import os.path
import netCDF4
from meillionen.interface.resource import Feather, NetCDF, OtherFile, Parquet
from meillionen.interface.schema import PandasHandler, NetCDFHandler
from meillionen.settings import Settings, Partitioning
from meillionen.client import Client, CLIRef
from prefect import task, Flow
BASE_DIR = '../../examples/crop-pipeline'
INPUT_DIR = os.path.join(BASE_DIR, 'workflows/inputs')
OUTPUT_DIR = os.path.join(BASE_DIR, 'workflows/outputs')
# using models as python models
# for binder compatibility only (since it does not support poetry yet)
OVERLANDFLOW = os.path.join(BASE_DIR, 'overlandflow/overlandflow_omf/cli.py')
SIMPLECROP = os.path.join(BASE_DIR, 'simplecrop/simplecrop_omf/cli.py')
ov_path = os.path.realpath(os.path.join(BASE_DIR, 'overlandflow'))
sc_path = os.path.realpath(os.path.join(BASE_DIR, 'simplecrop'))
os.environ['PYTHONPATH'] = f'{ov_path}:{sc_path}'
# using models as python packages
# need to poetry install simplecrop and overlandflow packages first
# OVERLANDFLOW = "overlandflow-omf"
# SIMPLECROP = "simplecrop-omf"
settings = Settings(
base_path=OUTPUT_DIR
)
and build a request to call our model with.
overlandflow = Client(CLIRef(OVERLANDFLOW), settings=settings)
overlandflow
<meillionen.client.Client at 0x7f3e703ec5e0>
Then call the overland flow model with our request (which will create files on the file system)
overland_payloads = overlandflow.run(
class_name='overlandflow',
method_name='run',
resource_payloads={
'elevation': OtherFile(os.path.join(INPUT_DIR, 'elevation.asc')),
'weather': Feather(os.path.join(INPUT_DIR, 'weather.feather')),
'soil_water_infiltration__depth': NetCDF.partial('swid')
}
)
Running Simple Crop on its Own¶
simplecrop
is a model of yearly crop growth that operates on the command line. When called in the current directory with no arguments it expects that there is a data folder. It expects that the data folder contains five files
irrig.inp
- daily irrigationplant.inp
- plant growth parameters for the simulationsimctrl.inp
- simulation reporting parameterssoil.inp
- soil characteristic parameters for the simulationweather.inp
- daily weather data (with variables like maximum temperature, solar energy flux)
simplecrop
also expects there to be an output folder. After the model has run it will populate the output folder with three files
plant.out
- daily plant characteristicssoil.out
- daily soil characteristicswbal.out
- summary soil and plant statistics about simulation
In order to wrap this model in an interface that allows you to run the model without manually building those input files and manually converting the output files into a format conducive to analysis we need to have a construct the input files and parse the output files. Fortunately we have such a model wrapper already.
import pandas as pd
simple_crop = Client(CLIRef(SIMPLECROP), settings=settings)
simple_crop_response = simple_crop.run(
class_name='simplecrop',
method_name='run',
resource_payloads={
'daily': Feather(os.path.join(BASE_DIR, 'simplecrop/data/daily.feather')),
'yearly': Feather(os.path.join(BASE_DIR, 'simplecrop/data/yearly.feather')),
'plant': Feather.partial(),
'soil': Feather.partial(),
'raw': OtherFile.partial(ext='')
})
soil_df = simple_crop_response.load('soil')
soil_df
day_of_year | soil_daily_runoff | soil_daily_infiltration | soil_daily_drainage | soil_evapotranspiration | soil_evaporation | plant_potential_transpiration | soil_water_storage_depth | soil_water_profile_ratio | soil_water_deficit_stress | soil_water_excess_stress | |
---|---|---|---|---|---|---|---|---|---|---|---|
0 | 3 | 0.0 | 0.0 | 1.86 | 2.25 | 2.23 | 0.02 | 260.97 | 1.800 | 1.000 | 1.0 |
1 | 6 | 0.0 | 0.0 | 2.25 | 2.64 | 2.62 | 0.02 | 264.09 | 1.821 | 1.000 | 1.0 |
2 | 9 | 0.0 | 0.0 | 0.94 | 1.32 | 1.31 | 0.01 | 253.64 | 1.749 | 1.000 | 1.0 |
3 | 12 | 0.0 | 0.0 | 0.57 | 2.56 | 2.53 | 0.02 | 249.09 | 1.718 | 1.000 | 1.0 |
4 | 15 | 0.0 | 0.0 | 0.00 | 1.94 | 1.89 | 0.02 | 241.54 | 1.666 | 1.000 | 1.0 |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
94 | 282 | 0.0 | 0.0 | 0.00 | 4.04 | 0.61 | 1.55 | 154.76 | 1.067 | 0.566 | 1.0 |
95 | 285 | 0.0 | 0.0 | 0.00 | 1.64 | 0.26 | 0.56 | 152.05 | 1.049 | 0.544 | 1.0 |
96 | 288 | 0.0 | 0.0 | 0.00 | 2.92 | 0.46 | 0.88 | 146.65 | 1.011 | 0.499 | 1.0 |
97 | 291 | 0.0 | 0.0 | 0.00 | 4.84 | 0.75 | 1.25 | 140.73 | 0.971 | 0.449 | 1.0 |
98 | 293 | 0.0 | 0.0 | 0.00 | 4.18 | 0.65 | 0.95 | 137.47 | 0.948 | 0.422 | 1.0 |
99 rows × 11 columns
Using Prefect to Feed Overland Flow Results into Simple Crop¶
Now we’ll combine overlandflow with simplecrop. This will require an adapter to augment the daily source data fed into simplecrop with the depth of water that has infiltrated that cell. The workflow is shown below without the details of calling simplecrop for each x, y coordinate in the map.
import pyarrow as pa
import pandas as pd
import pathlib
from meillionen.interface.base import MethodRequestArg
trial = settings.trial("simplecrop-parallelism")
overlandflow = Client(CLIRef(OVERLANDFLOW), settings=trial)
simplecrop_partitioning = Partitioning(
pa.schema([("x", pa.int32()), ("y", pa.int32())]))
simple_crop = Client(CLIRef(SIMPLECROP), settings=trial)
daily_df = pd.read_feather(os.path.join(BASE_DIR, 'simplecrop/data/daily.feather'))
yearly = Feather(os.path.join(BASE_DIR, 'simplecrop/data/yearly.feather'))
@task()
def run_overland_flow():
partial_resource_payloads = {
'elevation': OtherFile(os.path.join(INPUT_DIR, 'elevation.asc')),
'weather': Feather(os.path.join(INPUT_DIR, 'weather.feather')),
'soil_water_infiltration__depth': NetCDF.partial(variable='swid')
}
response = overlandflow.run(
class_name='overlandflow',
method_name='run',
resource_payloads=partial_resource_payloads
)
return response.load('soil_water_infiltration__depth')
@task()
def chunkify_soil_water_infiltration_depth(swid):
return [{'soil_water_infiltration__depth': swid[x, y, :], 'x': x, 'y': y}
for (x,y) in itertools.product(range(10,13), range(21,24))]
@task()
def simplecrop_process_chunk(data):
soil_water_infiltration__depth = data['soil_water_infiltration__depth']
x, y = data['x'], data['y']
partition = simplecrop_partitioning.complete(x=x, y=y)
daily_xy_df = daily_df.assign(rainfall=soil_water_infiltration__depth)
daily = simple_crop.save(
mra=MethodRequestArg(class_name='simplecrop', method_name='run', arg_name='daily'),
resource=Feather.partial(),
data=daily_xy_df,
partition=partition)
resource_payloads = {
'daily': daily,
'yearly': yearly,
'plant': Parquet.partial(),
'soil': Parquet.partial(),
'raw': OtherFile.partial(ext='')
}
response = simple_crop.run(
class_name='simplecrop',
method_name='run',
resource_payloads=resource_payloads,
partition=partition)
with Flow('crop_pipeline') as flow:
overland_flow = run_overland_flow()
surface_water_depth_chunks = chunkify_soil_water_infiltration_depth(overland_flow)
yield_chunks = simplecrop_process_chunk.map(surface_water_depth_chunks)
flow.run()
[2021-10-08 14:53:22-0700] INFO - prefect.FlowRunner | Beginning Flow run for 'crop_pipeline'
[2021-10-08 14:53:22-0700] INFO - prefect.TaskRunner | Task 'run_overland_flow': Starting task run...
[2021-10-08 14:53:27-0700] INFO - prefect.TaskRunner | Task 'run_overland_flow': Finished task run for task with final state: 'Success'
[2021-10-08 14:53:27-0700] INFO - prefect.TaskRunner | Task 'chunkify_soil_water_infiltration_depth': Starting task run...
[2021-10-08 14:53:27-0700] INFO - prefect.TaskRunner | Task 'chunkify_soil_water_infiltration_depth': Finished task run for task with final state: 'Success'
[2021-10-08 14:53:27-0700] INFO - prefect.TaskRunner | Task 'simplecrop_process_chunk': Starting task run...
[2021-10-08 14:53:27-0700] INFO - prefect.TaskRunner | Task 'simplecrop_process_chunk': Finished task run for task with final state: 'Mapped'
[2021-10-08 14:53:27-0700] INFO - prefect.TaskRunner | Task 'simplecrop_process_chunk[0]': Starting task run...
[2021-10-08 14:53:28-0700] INFO - prefect.TaskRunner | Task 'simplecrop_process_chunk[0]': Finished task run for task with final state: 'Success'
[2021-10-08 14:53:28-0700] INFO - prefect.TaskRunner | Task 'simplecrop_process_chunk[1]': Starting task run...
[2021-10-08 14:53:30-0700] INFO - prefect.TaskRunner | Task 'simplecrop_process_chunk[1]': Finished task run for task with final state: 'Success'
[2021-10-08 14:53:30-0700] INFO - prefect.TaskRunner | Task 'simplecrop_process_chunk[2]': Starting task run...
[2021-10-08 14:53:31-0700] INFO - prefect.TaskRunner | Task 'simplecrop_process_chunk[2]': Finished task run for task with final state: 'Success'
[2021-10-08 14:53:31-0700] INFO - prefect.TaskRunner | Task 'simplecrop_process_chunk[3]': Starting task run...
[2021-10-08 14:53:33-0700] INFO - prefect.TaskRunner | Task 'simplecrop_process_chunk[3]': Finished task run for task with final state: 'Success'
[2021-10-08 14:53:33-0700] INFO - prefect.TaskRunner | Task 'simplecrop_process_chunk[4]': Starting task run...
[2021-10-08 14:53:34-0700] INFO - prefect.TaskRunner | Task 'simplecrop_process_chunk[4]': Finished task run for task with final state: 'Success'
[2021-10-08 14:53:34-0700] INFO - prefect.TaskRunner | Task 'simplecrop_process_chunk[5]': Starting task run...
[2021-10-08 14:53:36-0700] INFO - prefect.TaskRunner | Task 'simplecrop_process_chunk[5]': Finished task run for task with final state: 'Success'
[2021-10-08 14:53:36-0700] INFO - prefect.TaskRunner | Task 'simplecrop_process_chunk[6]': Starting task run...
[2021-10-08 14:53:37-0700] INFO - prefect.TaskRunner | Task 'simplecrop_process_chunk[6]': Finished task run for task with final state: 'Success'
[2021-10-08 14:53:37-0700] INFO - prefect.TaskRunner | Task 'simplecrop_process_chunk[7]': Starting task run...
[2021-10-08 14:53:39-0700] INFO - prefect.TaskRunner | Task 'simplecrop_process_chunk[7]': Finished task run for task with final state: 'Success'
[2021-10-08 14:53:39-0700] INFO - prefect.TaskRunner | Task 'simplecrop_process_chunk[8]': Starting task run...
[2021-10-08 14:53:40-0700] INFO - prefect.TaskRunner | Task 'simplecrop_process_chunk[8]': Finished task run for task with final state: 'Success'
[2021-10-08 14:53:40-0700] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
<Success: "All reference tasks succeeded.">
from pyarrow.dataset import dataset
plant_df = dataset(
os.path.join(OUTPUT_DIR, 'simplecrop-parallelism/simplecrop/run/plant'),
partitioning=simplecrop_partitioning.to_arrow()).to_table().to_pandas()
plant_df
day_of_year | plant_leaf_count | air_accumulated_temp | plant_matter | plant_matter_canopy | plant_matter_fruit | plant_matter_root | plant_leaf_area_index | x | y | |
---|---|---|---|---|---|---|---|---|---|---|
0 | 121 | 2.00 | 0.00 | 0.30 | 0.25 | 0.05 | 0.00 | 0.01 | 10 | 21 |
1 | 123 | 2.20 | 0.00 | 0.65 | 0.55 | 0.10 | 0.00 | 0.02 | 10 | 21 |
2 | 126 | 2.50 | 0.00 | 1.20 | 1.02 | 0.18 | 0.00 | 0.03 | 10 | 21 |
3 | 129 | 2.79 | 0.00 | 1.91 | 1.62 | 0.29 | 0.00 | 0.04 | 10 | 21 |
4 | 132 | 3.09 | 0.00 | 2.80 | 2.38 | 0.42 | 0.00 | 0.06 | 10 | 21 |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
526 | 282 | 12.06 | 186.05 | 1130.70 | 465.39 | 82.13 | 583.19 | 1.49 | 12 | 23 |
527 | 285 | 12.06 | 223.00 | 1138.88 | 465.39 | 82.13 | 591.36 | 1.34 | 12 | 23 |
528 | 288 | 12.06 | 243.50 | 1154.41 | 465.39 | 82.13 | 606.90 | 1.25 | 12 | 23 |
529 | 291 | 12.06 | 277.65 | 1170.74 | 465.39 | 82.13 | 623.22 | 1.11 | 12 | 23 |
530 | 293 | 12.06 | 304.90 | 1178.78 | 465.39 | 82.13 | 631.27 | 0.99 | 12 | 23 |
531 rows × 10 columns