Implementation of Use Case 2#
This documentation explains how the workflow to simulate viscoelastic pastes, using OpenFOAM and the rheoTool lybrary, is incorporated within the MarketPlace. In detail, this manual provides an overview on most of the function having been created in this Use Case. This manual should serve as a detailed explanation on how to onboard your very own software in the MarketPlace.
Everything is organized within one folder and we will slowly go through each folder and file therein.
[Folder] simulation_controller
[Folder] static
.gitmodules
app.py
deploy_heroku.sh
docker-compose.yml
Dockerfile
openAPI.yml
prepare_deployment.sh
requirements.txt
Create a singularity container#
In order to insure the correct working of the workflow, we create a container which install all the software we need
Start from the image which already has mpi installed
FROM containers4hpc/base-mpich314:0.1.0
SHELL [ "/bin/bash", "-c" ]
Be sure to install all the basic software you need, after which we can install OpenFOAM, this worflow work with OpenFOAM 6.
RUN sh -c "wget -O - http://dl.openfoam.org/gpg.key | apt-key add -" ;\
add-apt-repository http://dl.openfoam.org/ubuntu ;\
apt-get update ;\
apt-get install -y --no-install-recommends openfoam6 ;\
rm -rf /var/lib/apt/lists/* ;\
echo "source /opt/openfoam6/etc/bashrc" >> ~foam/.bashrc ;\
echo "export OMPI_MCA_btl_vader_single_copy_mechanism=none" >> ~foam/.bashrc
After which we can install RheoTools
RUN git clone https://github.com/fppimenta/rheoTool.git;\
cd rheoTool;\
git checkout 56c2701636e6eb59dc51799fd0636805be23510d
RUN source /opt/openfoam6/etc/bashrc;\
cd rheoTool/of60;\
./downloadEigen;\
echo "export EIGEN_RHEO=$WM_PROJECT_USER_DIR/ThirdParty/Eigen3.2.9" >> ~foam/.bashrc
and install all the python dependencies that the workflow needs
#install python packages
RUN pip3 install numpy
RUN pip3 install --upgrade pillow
RUN pip3 install --upgrade matplotlib
RUN pip3 install scipy
RUN pip3 install vtk
Once the Dockerfile has all you need, you can create a docker image
docker build -t uc2 .
after saving the image as a .tar file you can create a singularity container
singularity build (--sandbox) uc2 docker-archive://uc2.tar
singularity build uc2.sif uc2
This container can be run anywhere
simulation controller#
Now we are working on the folder “simulation_controller” which contains several files that provide the function to create, start and stop a simulation as well as retrieving the simulation results.
Let us have a look at the following files that are all found in the folder “simulation_controller”
__init__.py
config.py
paste_files_creation.py
post_processing.py
simulation_manager.py
simulation_base.py
simulation_hpc.py
__init__.py#
The file “__init__.py” is an empty file and its only purpose is that python allows to include all function via the regular package syntax as libraries. This means in all files, we can include classes and function of other files with simple commands. Usually, python has no problem importing a whole directory. But when it comes to importing a class from a file in a directory, this will raise an exception. But having such a “__init__.py” file allows to use the following notation
from directory.filename import classname
when having a class “classname” in a file “filename” within the directoy “directory”.
config.py#
In this file, we have defined two classes with names “SimulationStatus” and “SimulationConfig” and follows with the definition for the simulation states. Here, we define 5 different kinds of states that are
created
running
completed
stopped
error
the class structure looks as follow
import logging
from enum import Enum
class SimulationStatus(Enum):
CREATED = 1
INPROGRESS = 2
COMPLETED = 3
STOPPED = 4
DOWNLOADED = 5
ERROR = 6
By writing “Enum” into the bracket, this class inheriting from the Enum class which is a built-in class from python. This allows to use a more natural syntax to ask the script whether the simulation has been created or whether it is running. In fact, we can apply the following notation
state = SimulationStatus.CREATED # which is equal to "CREATED"
# and then ask for that state by
if state == SimulationStatus.CREATED:
print('simulation has been created')
which is a readable syntax to ask for the state of a simulation. The logging module at the beginning of the snippet is a python in-built library that simplifies to write log files in which error messages are written to. This allows for example to write error messages
logging.error("This is my error message")
or info messages
logging.info("This is my info message")
It follows the simulation configuration
class SimulationConfig:
def __init__(self, request_obj: dict):
err_msg = f"Error creating simulation: {str(request_obj)}. "
self.configuration: int = int(request_obj.get("configuration", 1))
self.endTime: float = float(request_obj.get("endTime", 0.0002))
self.writeInterval: float = float(request_obj.get("writeInterval", 1e-4))
self.deltaT: float = float(request_obj.get("deltaT", 1e-8))
self.initWide: float = float(request_obj.get("initial_wide", 300))
self.UpVel: float = float(request_obj.get("upward_velocity", 0.003))
self.rheometer: bool = bool(request_obj.get("rheometer", False))
self.creation_time = time.time()
if self.deltaT > self.endTime:
err_msg += (
"the simulation time step should be smaller than the total simulation time"
)
logging.error(err_msg)
raise ValueError(err_msg)
This is a class that contains only an init method. This is the function that is called whenever an instant of the SimulationConfig class is created. Basically, this function receives the input parameters that are made available for the Use Case tutorial. These were
Total simulation time (s)
How often does the simulation write data (s)
Simulation time step (s)
Initial paste width (mm)
Upward velocity of the grid (m/s)
Boolean value which allow to chose if the user want to run the rheometer step of the app. In case is false the app will run with default values for the paste parameters. and the corresponding parameters were fed into a dictionary. At the beginning, each of the keys from this dictionary is called and in the case that this key has not been defined, a default is being returned. For example the code line
self.configuration: int = request_obj.get("configuration", 1)
asks if there is a key with the name “configuration” in the dictionary “request_obj”. If the key is present, its value is returned (that is the value that the user has provided in the MarketPlace interface - to be explained later). If the key has not been defined, we use the default value of “1”. This value for the configuration is mapped to an integer and stored in the variable “self.configuration” to make it available within the instant of the SimulationConfig object.
The same procedure is done for the other parameters. Additionally, we applied some checks to make sure the user input variables are in a physically valid range.
Additionally, the file also contains the following two lines of code
# Global Constant to define the extension of zip files
ZIP_EXTENSION = "zip"
# Global constant to define the path of the folder where all the simulations are saved
SIMULATIONS_FOLDER_PATH = "/app/simulation_files"
Which could also occur somewhere else and define global constants which are the folder path in which all simulation results are about to appear and the extension for the compression.
paste_files_creation.py#
This file contains two function need to create the simulation file. The first is “defineInitfield” which will create the initial field that will be read by OpenFOAM during the simulation
import os
import shutil
from distutils.dir_util import copy_tree
from simulation_controller.config import ( SIMULATIONS_FOLDER_PATH,SimulationConfig,)
def defineInitfield(wideIn,U_up,folderFSF):
The second is “create_input_files”
def create_input_files(foldername: str, simulationConfig: SimulationConfig):
"""
Function to create the start configuration for the MarketPlace simulation.
simulationConfig : SimulationConfig
instance with the specific configuration values for a run
"""
This function access the variables that have been defined in the SimulationConfig from above, and write out the necessary file for the simulation to run.
post_processing.py#
In this script we perform the post-processing of the data generated by the simulation.
We use the VTK library to read the data
import numpy as np
import vtk
import matplotlib.pyplot as plt
...
def extractInterfaceInfo(directory,nameCase):
VTK=directory+'/VTK/'+nameCase+'_'+str(f'{nameCa:.5g}')+'.vtk'
reader = vtk.vtkUnstructuredGridReader()
reader.SetFileName(VTK)
reader.Update()
...
then we use the matplotlib library to plot the interface width evolution with time
def interfaceTracing(directory,nameP):
interfaceAll=[]
broadAll=[]
markers = ['-o', '-.', '-,', '-x', '-+', '-v', '-^', '-<', '>-', '-s', '-d']
i=0
fig2, ax2 = plt.subplots(figsize=(16, 9))
fontdict={'fontsize': 20}
[interfaceOut,broadOut]=extractInterfaceInfo(directory,nameP)
interfaceAll.append(interfaceOut)
broadAll.append(broadOut)
plotInterf(interfaceOut,nameP,directory)
plt.close()
ax2.plot(broadOut[:,0],broadOut[:,1]/broadOut[0,1],markers[i], label=nameP)
ax2.set_adjustable("box")
ax2.set_title('Normalized paste broadening', fontdict)
ax2.set_ylabel('Normalized Broadening', fontdict)
ax2.set_xlabel('time [$s$]', fontdict)
ax2.tick_params(axis='both', which='major', labelsize=20)
ax2.tick_params(axis='both', which='minor', labelsize=20)
#fig2.legend()
fig2.savefig(directory+'/rheometerResults/paste_Broadening.png')
and the interface profile
def plotInterf(interface,name,directory):
#lb=['coarse','medium','fine']
fig3, ax3 = plt.subplots(figsize=(16, 9))
fontdict={'fontsize': 20}
y = abs(1e3-interface[0][:,0]*1e6)
z = interface[0][:,1]*1e6
plt.plot(y,z,label='init')
y = abs(1e3-interface[-1][:,0]*1e6)
z = interface[-1][:,1]*1e6
plt.plot(y,z,label='end')
#fig3.tight_layout()
ax3.axis('equal')
ax3.set_adjustable("box")
ax3.set_ybound(lower=0.0, upper=9e-5*1e6)
ax3.set_xbound(lower=0, upper=335)
ax3.set_ylabel('z [$\mu m$]', fontdict)
ax3.set_xlabel('y [$\mu m$]', fontdict)
#fig3.legend()
fig3.legend(bbox_to_anchor=(0.78, 0.75), loc='upper left', fontsize=20)
ax3.set_title('Interface comparison', fontdict)
ax3.tick_params(axis='both', which='major', labelsize=20)
ax3.tick_params(axis='both', which='minor', labelsize=20)
nf=directory+'/rheometerResults/interfaceComparison.png'
plt.savefig(nf)
simulation_manager.py#
This script is designed as the interface between the MarketPlace and the simulation.
class SimulationManager:
def __init__(self):
self.simulations: dict = {}
def _get_simulation(self, job_id: str) -> Simulation:
"""
Get the simulation corresponding to the job_id.
Args:
job_id (str): unique id of he simulation
Raises:
KeyError: if there is no simulation matching the id
Returns:
Simulation instance
"""
try:
simulation = self.simulations[job_id]
return simulation
except KeyError as ke:
message = f"Simulation with id '{job_id}' not found"
logging.error(message)
raise KeyError(message) from ke
def _add_simulation(self, simulation: Simulation) -> str:
"""Append a simulation to the internal datastructure.
Args:
simulation (Simulation): Object to add
Returns:
str: ID of the added object
"""
job_id: str = simulation.job_id
self.simulations[job_id] = simulation
return job_id
def _delete_simulation(self, job_id: str):
"""Remove a simulation from the internal datastructure.
Args:
job_id (str): id of the simulation to remove
"""
del self.simulations[job_id]
def create_simulation(self, request_obj: dict) -> str:
"""Create a new simulation given the arguments.
Args:
requestObj: dictionary containing input configuration
Returns:
str: unique job id
"""
return self._add_simulation(Simulation(request_obj))
def run_simulation(self, job_id: str):
"""Execute a simulation.
Args:
job_id (str): unique simulation id
"""
self._get_simulation(job_id).run()
def get_simulation_output(self, job_id: str) -> str:
"""Get the output a simulation.
Args:
job_id (str): unique simulation id
Returns:
str: json representation of the dlite object
"""
mapping = "SimpartixOutput"
mimetype = "vnd.sintef.dlite+json"
simulation = self._get_simulation(job_id)
return simulation.get_output(), mapping, mimetype
def stop_simulation(self, job_id: str) -> dict:
"""Force termination of a simulation.
Args:
job_id (str): unique id of the simulation
"""
self._get_simulation(job_id).stop()
def delete_simulation(self, job_id: str) -> dict:
"""Delete all the simulation information.
Args:
job_id (str): unique id of simulation
"""
self._get_simulation(job_id).delete()
self._delete_simulation(job_id)
def get_simulation_state(self, job_id: str) -> SimulationStatus:
"""Return the status of a particular simulation.
Args:
job_id (str): id of the simulation
Returns:
SimulationStatus: status of the simulation
"""
return self._get_simulation(job_id).status
def get_simulation_list(self) -> list:
"""Return unique ids of all the simulations.
Returns:
list: list of simulation ids
"""
return list(self.simulations.keys())
simulation_base.py#
This file contains the base class SimulationBase for managing a single simulation. It provides methods for starting a simulation, stopping it, zipping the output files, and deleting the simulation. The class also has a property for retrieving the status of the simulation.
import os
import uuid
from simulation_controller.config import (
SimulationStatus,
ZIP_EXTENSION,
SIMULATIONS_FOLDER_PATH,
SimulationConfig,
)
import logging
class SimulationBase:
"""Manage a single simulation."""
def __init__(self, request_obj: dict = {}):
"""
Initialize a SimulationBase object.
Args:
request_obj (dict, optional): A dictionary containing the simulation request details.
Defaults to an empty dictionary.
"""
self.job_id: str = str(uuid.uuid4())
self.simulationPath = os.path.join(SIMULATIONS_FOLDER_PATH, self.job_id)
self.simulation_config: SimulationConfig = SimulationConfig(request_obj)
self._status = None
def status(self, value: SimulationStatus):
"""
Set the status of the simulation.
Args:
value (SimulationStatus): The status value to set.
"""
self._status = value
def run(self):
...
def stop(self):
...
def zip_output(self) -> str:
...
def delete(self):
...
simulation.py#
This file define a class “SimulationHPC” that inherit from the “SimulationBase” and uses the Marketplace hpc app to set up a calculation on a remote server.
from simulation_controller.simulation_base import SimulationBase
from simulation_controller.config import (
SimulationStatus,
ZIP_EXTENSION,
SIMULATIONS_FOLDER_PATH,
SimulationConfig,
)
from simulation_controller.paste_files_creation import (
create_input_files
)
import logging
from hpc_gateway_sdk import get_app
import json
from distutils.file_util import copy_file
MP_ACCESS_TOKEN=''
class SimulationHPC(SimulationBase):
"""Manage a HPC simulation."""
def __init__(self, request_obj: dict = {}):
super().__init__(request_obj)
create_input_files(self.simulationPath, SimulationConfig(request_obj))
self._status: SimulationStatus = SimulationStatus.CREATED
self._hpc: HPCApp = get_app(name="mc",access_token=MP_ACCESS_TOKEN)
self._resource_id = self._hpc.create_user()
self.job_id = self._hpc.create_job(new_transformation={
"job_name": "uc2_demo",
"ntasks_per_node": 2,
"partition": "debug",
"image": "path-to-simgularity-container/uc2.sif",
"executable_cmd": "bash runJob.sh",
})
self._hpc_job_info = None
logging.info(
f"Simulation '{self.job_id}' with " f"configuration {request_obj} created."
)
in the “hpc.create_job” we indicate the path where the singularity container we have created in the first section is stored, together with the “runJob.sh” which is a bash script that execute all the workflow step.
Inside the “run” method of the “SimulationHPC” class we find
self._hpc.upload_file(self.job_id,filename='simulation.zip', source_path=os.path.join(SIMULATIONS_FOLDER_PATH,'simulation.zip'))
self._hpc.upload_file(self.job_id, filename='runJob.sh',source_path=os.path.join(templateDirPath,'runJob.sh'))
# start simulation
rsp = self._hpc.launch_job(self.job_id)
self.status = SimulationStatus.INPROGRESS
logging.info(f"Simulation '{self.job_id}' started successfully.")
which shows how to upload file on the server and launch a job.
Inside the “postprocess_output” there is a example of how to download file created by the simulation.
img_list=['paste_Broadening.png','interfaceComparison.png']#,'modulus_results.png','viscosity_results.png']
for imag in img_list:
file='simulation/caseBase/rheometerResults/'+imag
resp = self._hpc.download_file(self.job_id, filename=file)
with open(os.path.join(imagPath,imag), 'wb') as csr:
for chunk in resp.iter_content(chunk_size=1024):
if chunk:
csr.write(chunk)
Here for example we download the two images computed by the functions in “post_process.py”.