Newer
Older
"""
mpsd-software: tool for installation of software as on MPSD HPC.
"""
import argparse
import datetime
import logging
from typing import List, Tuple, Union
# If 'rich' is available ("pip install rich" or "apt-get install python3-rich"),
# then use coloured output, otherwise proceed as before
try:
import rich.logging
except ModuleNotFoundError:
rich_available = False
else:
rich_available = True
This function builds toolchains for MPSD-HPC at the appropriate directory, \n
for given system architecture and MPSD software stack version.\n
The toolchains
are built using the bash script spack_setup.sh, and the results are logged. """
call_date_iso = (
datetime.datetime.now().replace(microsecond=0).isoformat().replace(":", "-")
)
"cmd_log_file": "mpsd-software.log",
"metadata_tag_open": "!<meta>",
"metadata_tag_close": "</meta>!",
"spack_environments_repo": "https://gitlab.gwdg.de/mpsd-cs/spack-environments.git",
}
def create_log_file_names(
mpsd_release: str,
action: str,
date: str = call_date_iso,
toolchain: str = None,
) -> Union[str, None]:
This function creates the log file names for either the installer or
If a toolchain is given, then the build log file name is created.
if no toolchain is given, then the installer log file name is created.
The installer log file hosts the logs of the installer script, while
the build log file hosts the logs of the build process as generated by the
spack_setup.sh script.
Parameters
----------
mpsd_release : str
MPSD software stack version
system architecture
date : str
date of the call ins iso format
action : str
action performed (install,remove,reinstall,prepare,status)
only install and remove are valid for build log file.
toolchain : str
toolchain name (only for build log file)
log file name
installer_log_file_name or build_log_file_name depending on the
parameters given.
If the action is not one that changes the files on disk ( info only actions)
then None is returned.
if toolchain:
# if toolchain is given, then we build the build_log_file_name
if action in ["install", "remove"]:
f"{mpsd_release}_{microarch}_{date}_BUILD_{toolchain}_{action}.log"
else:
# if toolchain is not given, then we build the installer_log_file_name
log_file_name = f"{mpsd_release}_{microarch}_{date}_APEX_{action}.log"
return log_file_name
def log_metadata(key: str, value: str) -> None:
"""Log metadata to the log file.
This function logs metadata to the log file. The metadata is
enclosed in a tag, so that it can be easily found in the log file.
logging module is used to write the metadata to the log file.
Parameters
----------
key : str
key of the metadata
value : str
value of the metadata
returns : None
"""
logging.info(
f"{config_vars['metadata_tag_open']}{key}:{value}{config_vars['metadata_tag_close']}"
)
def read_metadata_from_logfile(logfile: Union[str, Path]) -> dict:
"""Read metadata from the log file.
This function reads metadata from the log file. The metadata is
enclosed in a tag, so that it can be easily found in the log file.
Parameters
----------
logfile : str or Path
log file name
returns : dict
dictionary containing the metadata
"""
with open(logfile, "r") as f:
log_text = f.read()
# check for all data that matches the regex
# metadata_tag_open {key}:{value} metadata_tag_close
# and return a dictionary with all the matches
return {
match.group(1): match.group(2)
for match in re.finditer(
f"{config_vars['metadata_tag_open']}(\w+):(\w+){config_vars['metadata_tag_close']}",
log_text,
)
}
def get_installer_log_file_path(mpsd_release: str, cmd: str, root_dir: str) -> str:

Ashwin Kumar Karnad
committed
# Get machine configs
os.environ.get("MPSD_OS", "UNKNOWN_OS")
microarch = get_native_microarchitecture()

Ashwin Kumar Karnad
committed
# parse logging first
# decide the log_file_name
installer_log_name = create_log_file_names(
mpsd_release=mpsd_release, microarch=microarch, action=cmd

Ashwin Kumar Karnad
committed
)
log_folder = root_dir / mpsd_release / "logs"

Ashwin Kumar Karnad
committed
# if the log_folder dosent exist, dont log this message if
# the command is a info-only command
if cmd not in ["status", "available"]:

Ashwin Kumar Karnad
committed
if not os.path.exists(log_folder):
os.makedirs(log_folder)
installer_log_file = log_folder / installer_log_name
else:
installer_log_file = None
return installer_log_file
def set_up_logging(loglevel="warning", file_path=None):
"""Set up logging.
This function sets up the logging configuration for the script.
Parameters
----------
loglevel : str or int
Loglevels are:
- warning (default): only print statements if something is unexpected
- info (show more detailed progress)
- debug (show very detailed output)
- filename to save logging messages into
If loglevel is 'debug', save line numbers in log messages.
Returns
-------
None.
Logger instances are generally not passed around, but retrieved from the
logging module as shown below (they are singletons).
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
We provide two loggers:
1. log = logging.getLogger('')
This is the 'root' logger. It uses a RichHandler if rich is available for
output to the shell, otherwise plain text.
Typical use:
log.debug("...")
log.info("...")
log.warn("...")
Equivalent to
logging.debug("...")
logging.info("...")
2. print_log = logging.getlogger('print')
This uses the logging module to issue the message, but prints without
any further markup (i.e. no date, loglevel, line number, etc). Think
PRINT via the LOGging module.
We use this as a replacement for the print function (i.e. for messages
that should not be affected by logging levels, and which should always
be printed).
Typical and intended use:
print_log.info("Available toolchains are ...")
The major difference from the normal print command is that the output
will be send to the stdout (as for print) AND the file with name
filename, so that these messages appear in the log file together with
normal log output.
# convert loglevel string into loglevel as number
log_level_numeric = getattr(logging, loglevel.upper(), logging.WARNING)
if not isinstance(log_level_numeric, int):
# set up the main logger ("root" logger)
logger = logging.getLogger("")
# - "logger" logs everything
# - we use loglevel at handler level to write everything to file
# - and filter using log_level_numeric (as the user provides) to
# send logging messages to the console
logger.setLevel(0)
# the handler determines where the logs go: stdout/file
if rich_available:
# https://rich.readthedocs.io/en/stable/logging.html
shell_handler = rich.logging.RichHandler()
# rich handler provides metadata automatically:
logging_format = "%(message)s"
# for shell output, only show time (not date and time)
shell_formatter = logging.Formatter(logging_format, datefmt="[%X]")
else:
shell_handler = logging.StreamHandler()
# include line numbers in output if level is DEBUG
linenumbers = " %(lineno)4d" if log_level_numeric == logging.DEBUG else ""
logging_format = "%(asctime)s %(levelname)7s" + linenumbers + " | %(message)s"
shell_formatter = logging.Formatter(logging_format)
# here we hook everything together
shell_handler.setFormatter(shell_formatter)
# use the log_level_numeric to decide how much logging is sent to shell
shell_handler.setLevel(log_level_numeric)
# Here we set the handlers of the RootLogger to be just the one we want.
# The reason is that the logging module will add a <StreamHandler <stderr>
# (NOTSET)> handler if logging.info/logging.debug/... is used before we
# come across this line. And we do not want that additional handler.
logger.handlers = [shell_handler]
# if filename provided, write log messages to that file, too.
if file_path:
file_handler = logging.FileHandler(file_path)
# if we have a file, we write all information in there.
# We could change the level, for example restrict to only DEBUG and above with
# file_handler.setLevel(logging.DEBUG)
file_logging_format = "%(asctime)s %(levelname)7s %(lineno)4d | %(message)s"
file_formatter = logging.Formatter(file_logging_format, datefmt="[%X]")
file_handler.setFormatter(file_formatter)
logger.addHandler(file_handler)
#
# new logger for printing
#
print_log = logging.getLogger("print")
print_log.setLevel(logging.INFO)
print_log.propagate = False
# create formatter 'empty' formatter
formatter = logging.Formatter("%(message)s")
ch = logging.StreamHandler()
ch.setFormatter(formatter)
print_log.addHandler(ch)
# if filename provided, write output of print_log to that file, too
# create, format and add file handler
fh = logging.FileHandler(file_path)
fh.setFormatter(formatter)
print_log.addHandler(fh)
#
# short message
#
f"Logging has been setup, loglevel={loglevel.upper()} "
+ f"{file_path=} {rich_available=}"

Ashwin Kumar Karnad
committed
def get_available_toolchains(mpsd_release: str) -> List[str]:
"""Given a release, return the available toolchains.
This is based on the spack-environment's repository [1]. For this function
to succeed, we need to have Internet access etc.
We use a temporary directory to clone the repository locally, which is
deleted upon successful completion of the function.
[1] https://gitlab.gwdg.de/mpsd-cs/spack-environments.git
Returns
-------
toolchains : List[str]
Example
-------
>>> get_available_toolchains('dev-23a')
['foss2021a-cuda-mpi',
'foss2021a-mpi',
'foss2021a-serial',
'foss2022a-cuda-mpi',
'foss2022a-mpi',
'foss2022a-serial',
'global',
'global_generic']

Ashwin Kumar Karnad
committed
logging.debug(f"get_available_toolchains({mpsd_release=})")
logging.info(f"Retrieving available toolchains for release {mpsd_release}")
print_log = logging.getLogger("print")
# create temporary directory
tmp_dir = tempfile.TemporaryDirectory(prefix="mpsd-software-available-")
tmp_dir_path = Path(tmp_dir.name)
# find toolchains by cloning repository and checking out right branch
clone_repo(
tmp_dir_path, config_vars["spack_environments_repo"], branch=mpsd_release
)
# look for directories defining the toolchains
toolchains = os.listdir(tmp_dir_path / "toolchains")
msg = f"Found toolchains {sorted(toolchains)}"
logging.debug(msg)
# the 'toolchains' split into toolchains (such as foss2022a-mpi) and sets
# of packages. Here we split them into the two categories for a more useful
# output:
toolchain_list = [
x.parents[0].name
for x in list((tmp_dir_path / "toolchains").glob("*/spack.yaml"))
]
package_sets = [
x.parents[0].name for x in list((tmp_dir_path / "toolchains").glob("*/*.list"))
]
logging.debug(f"{toolchain_list=}")
logging.debug(f"{package_sets=}")
# summarise toolchains found for use, and show packages provided for each
# package_set:
print_log.info(f"MPSD software release {mpsd_release}")
print_log.info("Toolchains: \n " + "\n ".join(sorted(toolchain_list)))
print_log.info("Package sets:")
for package_set in package_sets:
# that have the regex pattern \w+@\w+
packages = [
line.split()[0].split("%")[0]
for line in open(
tmp_dir_path / "toolchains" / package_set / "global_packages.list"
).readlines()
if re.match(r"^\w+@\w+", line)
]
print_log.info(f" {package_set} ({', '.join(packages)}) ")
# remove temporary directory
tmp_dir.cleanup()
return toolchains
# Helper class to change directory via context manager
class os_chdir:
"""The os_chdir class is a context manager.
It changes the current directory to a specified directory
and returns to the original directory after execution.
"""
"""Initialize, save original directory."""
self.new_dir = new_dir
self.saved_dir = os.getcwd()
"""Go to target directory (main action for context)."""
def __exit__(self, exc_type, exc_val, exc_tb):
"""On exist we return to original directory."""
def run(*args, counter=[0], **kwargs):
"""
Convenience function to call `subprocess.run` and provide some metadata
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
about the call.
Parameters
----------
args : tuple
passed on to subprocess.run(*args). For example
("ls -l") or (["ls", "-l"])
counter : TYPE, optional
list with one integer, starting from [0].
This is (a Python hack) to count the number of
calls of this function, so the different calls of subprocess.run
are easier to follow in the log files.
kwargs : dict
keyword-value arguments to be passed to subprocess.run. For example,
`shell=True`.
Returns
-------
process : subprocess.CompletedProcess
CompletedProcess object as returned by `subprocess.run` .
Examples
--------
>>> run(['date', '+%Y-%m-%d'])
##-03 Starting subprocess.run(['date', '+%Y-%m-%d']) with options
##-03 getcwd=/Users/fangohr/git/mpsd-software-environments
##-03 COMMAND=date +%Y-%m-%d
2023-05-30
##-03 Completed in 0.0054s.
##-03
CompletedProcess(args=['date', '+%Y-%m-%d'], returncode=0)
>>> run(['date +%Y-%m-%d'], shell=True)
##-04 Starting subprocess.run(['date +%Y-%m-%d']) with options shell=True
##-04 getcwd=/Users/fangohr/git/mpsd-software-environments
##-04 COMMAND=date +%Y-%m-%d
2023-05-30
##-04 Completed in 0.0069s.
##-04
CompletedProcess(args=['date +%Y-%m-%d'], returncode=0)
"""
# token is printed in front of every meta-data line - useful for
# searching the logs. Starts with "##-00", then "##-01", ...
token = f"##-{counter[0]:02d}"
counter[0] += 1 # increase counter
# make command nicely readable: ["ls", "-l"] -> "ls -l"
assert isinstance(args, tuple)
assert len(args) == 1
arg = args[0]
# either args is a tuple containing a string | Example: ('ls -1',)
if isinstance(arg, str):
command = arg
# or we have a tuple containing a list of strings.
# Example: (['ls', '-1'],)
elif isinstance(arg, list):
command = " ".join(arg)
else:
# we do not expect this to happen
raise NotImplementedError(f"{arg=}, {args=}")
# make options (such as `shell=True`) nicely readable
options = ", ".join([f"{key}={value}" for key, value in kwargs.items()])
# provide information about upcoming subprocess.run call
logging.debug(
f"{token} Starting subprocess.run('{command}') with options {options}"
)
logging.debug(f"""{token} getcwd={os.getcwd()}""")
logging.debug(f"""{token} subprocess.run("{arg}")""")
time_start = time.time()
process = subprocess.run(*args, **kwargs)
execution_time = time.time() - time_start
logging.debug(f"{token} Completed in {execution_time:.4f}s.")
logging.debug(f"{token}") # near-empty line to make reading logs easier
def record_script_execution_summary(
mpsd_release: str, root_dir: str, msg: str = None, **kwargs
Log the command used to build the toolchains.
It also logs information about the software environment installer branch,
the Spack environments branch, and the commit hashes of each.
It also logs steps taken
in the install process using the optional message argument.
Parameters
----------
- mpsd_release : str
The name of the release to install toolchains for.
The path to the directory where the scripts are located.
- msg : str, optional
An optional message to log in the command log file.
The name of the Spack environments branch.
- spe_commit_hash : str
The commit hash of the Spack environments branch.
Returns
-------
- None
release_base_dir = root_dir / mpsd_release
# Write to the log file with the following format
# --------------------------------------------------
# 2023-02-29T23:32:01, install-software-environment.py --release 23b --install ALL
# Software environment installer branch: script_branch (commit hash: \
# script_commit_hash)
# Spack environments branch: dev-23a (commit hash: spe_commit_hash)
# MSGs
with os_chdir(release_base_dir):
with open(config_vars["cmd_log_file"], "a") as f:
if msg:
# Write the message to the log file
f.write(msg + "\n")
else:
# Write the header
f.write("-" * 50 + "\n")
# Gather data to log
# call statement:
cmd_line = " ".join(sys.argv)
# script branch and commit hash
["git", "rev-parse", "--abbrev-ref", "HEAD"],
stdout=subprocess.PIPE,
.stdout.decode()
.strip()
)
script_commit_hash = (
["git", "rev-parse", "--short", "HEAD"],
stdout=subprocess.PIPE,
.stdout.decode()
.strip()
# spack-environments branch and commit hash from kwargs
spe_branch = kwargs.get("spe_branch", None)
spe_commit_hash = kwargs.get("spe_commit_hash", None)
# Write to log file
f.write(f"{datetime.datetime.now().isoformat()}, {cmd_line}\n")
f"Software environment installer branch: {script_branch} "
f"(commit hash: {script_commit_hash})\n"
f"Spack environments branch: {spe_branch} "
f"(commit hash: {spe_commit_hash})\n"
def clone_repo(
target_path: Path, repo_url: str, branch=None, capture_output=True
) -> None:
"""Clone repo locally. Optionally checkout a branch.
target_path : Path
Where to check the repository out to
repo_url: str
where to clone the git repository from
branch: str (defaults to None)
if provided, checkout this branch after cloning
capture_output: bool (defaults to True)
capture output, i.e. do not send it to stdout.
if not target_path.exists():
target_path.mkdir()
["git", "clone", repo_url, str(target_path)],
capture_output=capture_output,
)
if branch:
with os_chdir(target_path):
# Git fetch and checkout the release branch and git pull
# to be sure that the resulting repo is up to date
run(["git", "fetch", "--all"], check=True, capture_output=capture_output)
checkout_result = run(
["git", "checkout", branch], capture_output=capture_output
)
if checkout_result.returncode != 0:
msg = f"Couldn't find {branch=}\n"
branches_result = run(
["git", "branch", "-a"], check=True, capture_output=True
branches_list = branches_result.stdout.decode().split("\n")
# strip off 'remotes/origin' (needs Python 3.9):
branches_list = [
b.strip().removeprefix("remotes/origin/") for b in branches_list
]
msg += f"Available branches are {branches_list}"
logging.error(msg)
raise Exception(msg, branches_result)
else:
run(["git", "pull"], check=True, capture_output=capture_output)
def get_release_info(mpsd_release: str, root_dir: Path) -> Tuple[str, str, List[str]]:
Get information about the specified release.
Get information about the specified release, such as the branch and commit hash
of the Spack environments repository and the available toolchains.
Parameters
----------
The name of the release to get information for.
The base directory where releases are stored.
Returns
-------
The name of the branch for the Spack environments repository.
The commit hash for the Spack environments repository.
available_toolchains : list
A list of strings representing the available toolchains for the release.
Raises
------
If the release directory does not exist.
# Get the info for release
release_base_dir = root_dir / mpsd_release
if not os.path.exists(release_base_dir):
logging.debug(f"get_release_info({mpsd_release=}, {root_dir=})")
raise FileNotFoundError(
with os_chdir(release_base_dir):
with os_chdir("spack-environments"):
# Get the branch and commit hash of the spack-environments repo
spe_commit_hash = (
run(["git", "rev-parse", "HEAD"], stdout=subprocess.PIPE, check=True)
spe_branch = (
["git", "rev-parse", "--abbrev-ref", "HEAD"],
stdout=subprocess.PIPE,
check=True,
available_toolchains = os.listdir("toolchains")
return spe_branch, spe_commit_hash, available_toolchains
def prepare_environment(mpsd_release: str, root_dir: Path) -> List[str]:
Create the directory structure for the given MPSD release.
It does the following steps:
Clones the spack-environments repository.
Determines the branch and commit hash of the spack-environments repository
and the available toolchains.
Parameters
----------
mpsd_release : str
The name of the MPSD release to prepare the environment for.
The base directory to create the release folder and
clone the spack-environments repository into.
Returns
-------
available_toolchains : list
A list of available toolchains for the given MPSD release.
# Creates the directory structure for the specified release and clone the
# Spack environments repository if it doesn't exist:
# Create the directory structure for the release
release_base_dir = root_dir / mpsd_release
release_base_dir.mkdir(parents=True, exist_ok=True)
repo_path = release_base_dir / "spack-environments"
if repo_path.exists():
logging.debug(f"directory {repo_path} exists already, not touching")
logging.debug(
"XXX TODO: should we run a git pull here to get the latest version? XXX"
)
else:
repo_url = config_vars["spack_environments_repo"]
logging.info(f"cloning repository {repo_path} from {repo_url}")
clone_repo(repo_path, repo_url, branch=mpsd_release)
logging.getLogger("print").info(
f"Release {mpsd_release} is prepared in {release_base_dir}"
)
spe_branch, spe_commit_hash, available_toolchains = get_release_info(
record_script_execution_summary(
mpsd_release, root_dir, spe_branch=spe_branch, spe_commit_hash=spe_commit_hash
return available_toolchains
def get_native_microarchitecture():
"""Return native microarchitecture.
On MPSD machines, there should be an environment variable "microarch".
We try to read that. If it fails, we use the 'archspec cpu' command.
If that fails, we ask the user to install it.
Returns
-------
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
Example
-------
>>> get_native_microarchitecture()
'haswell'
"""
# attempt to get MICRO_ARCH from environment variable (should work on
# MPSD_HPC and MPSD linux laptops). If not defined, return
# "UNKNOWN_MICROARCH"
microarch = os.environ.get("MPSD_MICROARCH", "UNKNOWN_MICROARCH")
# if we have not found the microarchitecture environment variable,
# try calling archspec
if microarch == "UNKNOWN_MICROARCH":
logging.debug(
"Couldn't find MPSD_MICROARCH environment variable. Will try archspec."
)
try:
process = run(["archspec", "cpu"], stdout=subprocess.PIPE, text=True)
except FileNotFoundError as e:
logging.debug(f"Call of 'archspec cpu' failed: {e=}")
# Presumably 'archspec' is not installed.
msg = "Please install archspec, for example via 'pipx install archspec'.\n"
msg += "The command we need to execute is 'archspec cpu'.\n"
msg += "Documentation of package: https://archspec.readthedocs.io/"
logging.error(msg)
sys.exit(1)
else: # we have found archspec and executed it
if process.returncode == 0: # sanity check
microarch = process.stdout.strip()
logging.debug(
f"Found microarchitecture from 'archspec cpu' to be '{microarch}'"
)
assert len(microarch) > 0 # sanity check
else:
raise ValueError(
f"Some error occurred when calling 'archspec cpu': {process=}"
)
# at this point, we have determined the microarchitecture
log_metadata("microarchitecture", microarch)
return microarch
def install_environment(
mpsd_release: str,
toolchains: List[str],
force_reinstall: bool = False,
enable_build_cache: bool = False,
) -> None:
Install the specified MPSD release and toolchains.
The function installs the toolchain to the specified directory, using Spack.
Parameters
----------
mpsd_release : str
A string representing the MPSD release version.
toolchains : list of str
A list of strings representing the toolchains to install
(e.g., "foss2021a-mpi", "global_generic", "ALL").
A Path object representing the path to the directory where
the release and toolchains will be installed.
force_reinstall : bool, optional
A boolean indicating whether to force a reinstallation
even if the release and toolchains already exist. Defaults to False.
enable_build_cache : bool, optional
A boolean indicating whether to build the build cache
when installing toolchains. Defaults to False.
Raises
------
ValueError
If a requested toolchain is not available in the specified release.
Returns
-------
None
logging.info(
f"Installing release {mpsd_release} with toolchains {toolchains} "
release_base_dir = root_dir / mpsd_release
microarch = get_native_microarchitecture()
toolchain_dir = release_base_dir / microarch
toolchain_dir.mkdir(parents=True, exist_ok=True)
spack_setup_script = release_base_dir / "spack-environments" / "spack_setup.sh"
install_flags = []
install_flags.append("-b")
# run the prepare_environment function
available_toolchains = prepare_environment(mpsd_release, root_dir)
# Ensure that the requested toolchains are available in the release
if toolchains == "ALL":
toolchains = available_toolchains
elif toolchains == "NONE":
# No toolchains requested, so we only create the env and print the
# list of available toolchains
logging.warning(
"No toolchains requested. Available toolchains for release "
f"{mpsd_release} are: \n {available_toolchains}"
print_log = logging.getLogger("print")
print_log.info(f"{available_toolchains=}")
return
for toolchain in toolchains:
if toolchain not in available_toolchains:
# TODO: add to message how toolchains can be found
msg = f"Toolchain '{toolchain}' is not available in release {mpsd_release}."
logging.error(msg)
sys.exit(1)
# Install the toolchains
with os_chdir(toolchain_dir):
# run spack_setup_script with the toolchains as arguments
# if the log folder doesn't exist, create it
if not os.path.exists("logs"):
os.mkdir("logs")
# Set the install log file name from create_log_file_names
build_log_file_name = create_log_file_names(
mpsd_release, microarch, "install", toolchain=toolchain
build_log_folder = release_base_dir / "logs"
build_log_path = build_log_folder / build_log_file_name
# if logs folder dosent exist, create it
if not os.path.exists(build_log_folder):
os.makedirs(build_log_folder)
logging.info(f"Installing toolchain {toolchain} to {toolchain_dir}")
record_script_execution_summary(
msg=f"installing {toolchain} and logging at {build_log_path}",
record_script_execution_summary(
f"CMD: bash {spack_setup_script} {' '.join(install_flags)} "
f"{toolchain}"
f"bash {spack_setup_script} {' '.join(install_flags)} {toolchain} 2>&1 "
f"| tee -a {build_log_path} ",
def remove_environment(release, toolchains, target_dir):
msg = f"Removing release {release} with toolchains {toolchains} from {target_dir}"
logging.info(msg)
raise NotImplementedError(msg)
def start_new_environment(release, from_release, target_dir):
"""Start new MPSD software environment version."""
msg = f"Starting new release {release} from {from_release} to {target_dir}"
logging.info(msg)
raise NotImplementedError(msg)
def environment_status(mpsd_release: str, root_dir: Union[str, Path]) -> dict:
"""Show status of release in installation.
Parameters
----------
mpsd_release : str
A string representing the MPSD release version.
root_dir : pathlib.Path
A Path object pointing to the root directory of the installation.
Expect a subfolder root/mpsd_release in which we search for the
toolchains.
Returns
-------
toolchain_map : dict
A dictionary containing available microarchitectures as keys and
a list of available toolchains as values for each microarchitecture.
"""
msg = f"Showing status of release {mpsd_release} in {root_dir}"
plog = logging.getLogger("print")
microarch = get_native_microarchitecture()
toolchain_dir = release_base_dir / microarch
spack_dir = toolchain_dir / "spack"
# if the mpsd_release does not exist:
if not release_base_dir.exists():
logging.debug(f"Directory {str(release_base_dir)} does not exist.")
logging.error(f"MPSD release '{mpsd_release}' is not installed.")
return None
# if the mpds_release directory exists but the spack repository is not fully
# cloned - indicates some kind of incomplete installation:
logging.debug(f"Looking for files in {spack_dir}")
f"MPSD release '{mpsd_release}' has not been completely installed."
# find all folders for all microarch in the release directory
# except for the blacklisted files
black_listed_files = [
config_vars["cmd_log_file"],
"spack-environments",
"logs",
"mpsd-spack-cache",
]