Skip to content
Snippets Groups Projects
test_mpsd_software.py 24.69 KiB
"""Tests for mpsd-software-environment.py."""

import importlib
import os
import shutil
import subprocess
from pathlib import Path
import logging
import datetime
import sys

import pytest

mod = importlib.import_module("mpsd_software_manager.mpsd_software")

# set loglevel to debug - useful for understanding problems.
# (if the tests pass, pytest doesn't show any output)
mod.set_up_logging(loglevel="debug", file_path="tests.log")
logging.debug(f"We have set up logging from {__file__}")


def create_mock_git_repository(target_directory, create_directory=True):
    """
    Create a git repository in the directory `target_directory`.

    Arguments
    ---------
    target_directory : pathlib.Path
      - path at which the root of the repository should be located (i.e. `.git` folder)

    create_directory : bool
      - create `target_directory` and parent directories if True

    """
    # create directory first
    if create_directory:
        target_directory.mkdir(parents=True)

    # then create git repository:
    with mod.os_chdir(str(target_directory)):
        subprocess.run("git init .", shell=True, check=True)
        subprocess.run("echo 'fake content' > readme.txt", shell=True, check=True)
        subprocess.run("git add readme.txt", shell=True, check=True)
        subprocess.run("pwd", shell=True)

        # if email and username are not available (such as on naked test container),
        # git may complain. We set a temporary user for this one commit to work around
        # that.
        user_details = "-c user.name='Tes Ta' -c user.email='tester@some-ci.org'"
        subprocess.run(
            f'git {user_details} commit -m "first commit" readme.txt',
            shell=True,
            check=True,
        )


def test_os_chdir(tmp_path):
    """Test the os_chdir context manager."""
    # create a temporary directory for testing
    temp_dir = tmp_path / "test_os_chdir"
    temp_dir.mkdir()

    # initial current working directory
    initial_cwd = os.getcwd()

    # change to the temporary directory using os_chdir
    with mod.os_chdir(str(temp_dir)):
        assert os.getcwd() == str(temp_dir)

    # current working directory should be back to initial directory
    assert os.getcwd() == initial_cwd


def test_run_method(tmp_path):
    """Run tests for run method."""
    run = mod.run

    # test a command with options:
    assert run(["date", "+%Y-%m-%d"]).returncode == 0
    assert run("date +%Y-%m-%d", shell=True).returncode == 0

    # tests interacting with the file system
    with mod.os_chdir(str(tmp_path)):
        # ensure single string command works
        assert run(("ls -l"), shell=True).returncode == 0
        # test spaces are handled correctly:
        assert run(["touch", "file1", "file2"]).returncode == 0
        assert os.path.exists("file1")
        assert os.path.exists("file2")
        # test output is captured:
        assert (
            b"Hello, world!\n"
            in run(["echo", "Hello, world!"], capture_output=True).stdout
        )

    # check exceptions
    with pytest.raises(FileNotFoundError):
        run(["doesnotexistcommand"])

    # check error code is checked
    # 1. expect this to parse: return code is non-zero, but we don't check
    run(["ls", "/doesnotexist"]),
    # 2. expect this to fail:
    with pytest.raises(subprocess.CalledProcessError):
        run(["ls", "/doesnotexist"], check=True)


def test_prepare_environment(tmp_path):
    """Simulate running preparation of environment.

    Simulate running ./install-software-environment.py --release dev-23a \
      --target-directory /tmp/test_prepare_env
    prepare_env is run when cmd is not specified, we can test cmd='prepare'
    and cmd=None to check both cases
    """
    root_dir = tmp_path / "mpsd_opt" / "linux_debian_11"
    spack_environments = "spack-environments"
    mpsd_release_to_test = "dev-23a"
    release_base_dir = root_dir / mpsd_release_to_test
    # check that the test directory does not exist
    assert not root_dir.exists()

    # prepare_environment expects to be executed in git repository
    # (mpsd-software-environments). It queries the commit on which we are to
    # log that information. For this to work, we need to execute the command
    # within a directory tree that has a git repository at the same or high
    # level. Let's create one:
    create_mock_git_repository(root_dir)

    # now call the function we want to test
    result = mod.prepare_environment(
        mpsd_release=mpsd_release_to_test, root_dir=root_dir
    )

    # check if the directory now is created
    assert release_base_dir.exists()
    # check for spack-environments directory
    assert spack_environments in os.listdir(release_base_dir)

    # check if the git branch is correctly checked out. We expect output such as
    # git_branch_stdout = '* dev-23a\n  develop\n'
    # The entry with the '* ' prefix is the active branch.
    git_branch_output_raw = subprocess.run(
        f"cd {str(release_base_dir/spack_environments)} && git branch",
        shell=True,
        capture_output=True,
    )
    git_branch_stdout = git_branch_output_raw.stdout.decode("utf-8")
    assert f"* releases/{mpsd_release_to_test}" in git_branch_stdout

    # check that result is a list and contains atleast ['global','foss2021a-mpi']
    assert isinstance(result, list)
    assert "global" in result
    assert "foss2021a-mpi" in result

    # Expect an Exception when wrong mpsd_release is provided
    with pytest.raises(Exception):
        result = mod.prepare_environment(
            mpsd_release="wrong-mpsd-release", root_dir=(root_dir)
        )


def test_write_to_cmd_log(tmp_path):
    """Check that we write to the correct log file"""
    cmd_log_file = mod.config_vars["cmd_log_file"]
    mod.write_to_cmd_log(root_dir=tmp_path, msg="test_cmd")
    assert os.path.exists(tmp_path / cmd_log_file)
    with open(tmp_path / cmd_log_file, "r") as f:
        assert "test_cmd" in f.read()


def test_record_script_execution_summary(tmp_path):
    """Check that cmd log is updated with header

    Check that logs/install-software-environment.log is updated when the module is run
    """
    cmd_log_file = mod.config_vars["cmd_log_file"]

    root_dir = tmp_path / "test_prepare_env"
    script_version = mod.__version__
    if os.path.exists(root_dir / cmd_log_file):
        initial_bytes = os.path.getsize(cmd_log_file)
    else:
        initial_bytes = 0

    # run the init functionality to check the creation of log file
    create_mock_git_repository(target_directory=root_dir, create_directory=True)
    mod.initialise_environment(root_dir=(root_dir))

    # check that logs/install-software-environment.log is updated
    assert os.path.exists(root_dir / cmd_log_file)
    assert os.path.getsize(root_dir / cmd_log_file) > initial_bytes

    # Check that the log file has "Spack environments branch: dev-23a " in the last line
    with open(root_dir / cmd_log_file, "r") as f:
        lines = f.readlines()
        assert f"Initialising MPSD software instance at {tmp_path}" in lines[-2]
        assert f"MPSD Software manager version: {script_version}" in lines[-1]


def test_install_environment_wrong_package_set(tmp_path):
    """Test exception is raised for non-existing package_set."""
    # exits with exit code 20 when wrong package_sets are provided
    with pytest.raises(SystemExit) as e:
        mod.install_environment(
            mpsd_release="dev-23a",
            package_sets=["wrong-package_set"],
            root_dir=(tmp_path),
        )
    assert e.type == SystemExit
    assert e.value.code == 20


def test_install_environment_wrong_mpsd_release(tmp_path):
    """Test exception is raised for non-existing mpsd release."""
    # Expect an Exception when wrong mpsd_release is provided (part of
    # prepare_environment)
    with pytest.raises(Exception):
        mod.install_environment(
            mpsd_release="wrong-mpsd-release",
            package_sets=["foss2021a-mpi"],
            root_dir=(tmp_path),
        )


@pytest.mark.skipif(sys.platform == "darwin", reason="install not working on OSX")
def test_install_environment_zlib():
    """Test installation of package_set."""
    # Prepare a test installation of global generic
    # with only zlib to test the installation
    # This is a long test,
    # its handy to test this with print statements printed to
    # stdout, use:
    #   pytest -s
    # for this installation avoid tmp_path as
    # the length of the path becomes too long and spack complains
    root_dir = Path("/tmp/test_global_generic")
    if root_dir.exists():
        shutil.rmtree(root_dir)
    root_dir.mkdir(exist_ok=True, parents=True)
    mpsd_release_to_test = "dev-23a"
    package_set_to_test = "global_generic"
    cmd_log_file = mod.config_vars["cmd_log_file"]
    microarch = mod.get_native_microarchitecture()
    release_base_dir = root_dir / mpsd_release_to_test
    create_mock_git_repository(target_directory=root_dir, create_directory=False)
    mod.prepare_environment(mpsd_release=mpsd_release_to_test, root_dir=(root_dir))
    # Patch the spack environments to create a fake global_generic
    # create a test package_set
    package_set_src_dir = release_base_dir / "spack-environments" / "toolchains"
    # with mod.os_chdir(package_set_src_dir):
    #     subprocess.run(
    #         "cp -r foss2021a-mpi fuss1999a", shell=True, capture_output=True
    #     )
    # add zlib as a spec to global_generic
    with open(
        package_set_src_dir / "global_generic" / "global_packages.list", "w"
    ) as f:
        f.write("zlib@1.2.13 \n")

    # add zlib to whitelist of module creation file by replacing anaconda3%gcc@10.2.1
    # with zlib@1.2.13
    # in release_base_dir / "spack-environments/spack_overlay/etc/spack/modules.yaml"
    module_file = (
        release_base_dir / "spack-environments/spack_overlay/etc/spack/modules.yaml"
    )
    with open(module_file, "r") as f:
        lines = f.read().replace("anaconda3%gcc@10.2.1", "zlib@1.2.13")
    with open(module_file, "w") as f:
        f.write(lines)

    # Replace gcc@10.2.1 with gcc#13.1.1 or available system gcc for testing on laptop
    gcc_ver = (
        subprocess.run(["gcc -dumpfullversion"], shell=True, capture_output=True)
        .stdout.decode("utf-8")
        .strip()
    )
    assert len(gcc_ver) > 3, f"Couldn't find gcc {gcc_ver=}"

    setup_file = release_base_dir / "spack-environments/spack_setup.sh"
    with open(setup_file, "r") as f:
        lines = f.read().replace(
            'system_compiler="gcc@10.2.1"', f'system_compiler="gcc@{gcc_ver}"'
        )
    with open(setup_file, "w") as f:
        f.write(lines)

    # install global_generic package_set
    mod.set_up_logging(
        "WARNING",
        mod.get_log_file_path(mpsd_release_to_test, "install", root_dir),
    )
    mod.install_environment(
        mpsd_release=mpsd_release_to_test,
        package_sets=[package_set_to_test],
        root_dir=root_dir,
        enable_build_cache=False,
    )
    # test that the build log is created correctly
    # check that a file with glob build_globale_generic_dev-23a*.log exists at
    # release_base_dir/microarch
    # print("Debug here ")
    # time.sleep(10)

    log_files = list(
        (release_base_dir / "logs").glob(
            f"{mpsd_release_to_test}_{microarch}_*_install.log"
        )
    )
    assert len(log_files) == 2
    # take the most recent log as build log
    apex_log = sorted(log_files)[0]
    build_log = sorted(log_files)[1]
    assert "APEX" in str(apex_log)
    assert "BUILD" in str(build_log)
    # check that the build log contains statement ##### Installation finished
    with open(build_log, "r") as f:
        lines = f.read()
        assert "##### Installation finished" in lines
    os.path.basename(build_log)

    # assert that APEX log file points to the build log file
    with open(apex_log, "r") as f:
        lines = f.read()
        assert (
            f"> Logging installation of {package_set_to_test} at {build_log}" in lines
        )

    # assert that cmd log files exists
    assert os.path.exists(root_dir / cmd_log_file)

    # assert that the mpsd release and hash is written to the cmd log file
    os.path.basename(build_log)
    with open(root_dir / cmd_log_file, "r") as f:
        lines = f.read()
        assert f"Spack environments branch: releases/{mpsd_release_to_test}" in lines
        # assert (
        #     f"> logging to {apex_log}" in lines
        # ) # TODO this has to be tested when main() called ie via CLI
    # assert that the module files are created correctly
    assert os.path.exists(release_base_dir / microarch)
    assert os.path.exists(release_base_dir / microarch / "lmod")
    # assert that lmod/module-index.yaml contains zlib
    with open(release_base_dir / microarch / "lmod" / "module-index.yaml", "r") as f:
        lines = f.read()
        assert "zlib" in lines

    # install again to ensure that
    # commands that skip creation of folders when
    # they are already present works as expected
    # reload the module to ensure that date changes
    importlib.reload(mod)
    mod.set_up_logging(
        "WARNING",
        mod.get_log_file_path(mpsd_release_to_test, "install", root_dir),
    )
    mod.install_environment(
        mpsd_release=mpsd_release_to_test,
        package_sets=[package_set_to_test],
        root_dir=root_dir,
        enable_build_cache=False,
    )
    build_log = list(
        (release_base_dir / "logs").glob(
            f"{mpsd_release_to_test}_{microarch}_*_install.log"
        )
    )
    assert len(build_log) == 4

    # test that the removal now works
    # mod.remove_environment(
    #     mpsd_release=mpsd_release_to_test,
    #     package_sets=[package_set_to_test],
    #     root_dir=root_dir,
    # )
    # # ensure that the module files are removed


def test_metadata_logging(tmp_path):
    """Test that metadata is logged and read correctly."""
    # Test that the metadata is logged correctly
    filename = tmp_path / "test-metadata.log"
    print(f"Writing to {filename}")
    mod.set_up_logging(loglevel="debug", file_path=filename)

    # our test data
    keys = ["important_key", "important_key2"]
    values = ["important_value", "important_value2"]

    expected_log_entries = []
    for key, value in zip(keys, values):
        mod.log_metadata(key, value)
        open_tag = mod.config_vars["metadata_tag_open"]
        close_tag = mod.config_vars["metadata_tag_close"]
        expected_log = f"{open_tag}{key}:{value}{close_tag}"
        expected_log_entries.append(expected_log)
        logging.info(f"Add some other info (after adding {key=})")
        logging.debug("Add some other info")
        logging.warning("Add some other info")

    # Check that relevant lines show up in the log file somewhere
    with open(filename, "r") as f:
        logfile_content = f.read()
        for expected_log in expected_log_entries:
            assert expected_log in logfile_content

    # Test that the metadata is read correctly using our parser
    read_dict = mod.read_metadata_from_logfile(tmp_path / "test-metadata.log")

    # check all entries are in the file
    for key, value in zip(keys, values):
        read_dict[key] == value

    # check no additional entries are there
    assert len(read_dict) == len(keys)


def test_get_available_package_sets():
    """
    Test that available package_sets are reported correctly.

    Needs internet access to succeed.
    """
    package_sets = mod.get_available_package_sets("dev-23a")
    assert sorted(package_sets) == sorted(
        [
            "foss2021a-cuda-mpi",
            "foss2021a-mpi",
            "foss2021a-serial",
            "foss2022a-cuda-mpi",
            "foss2022a-mpi",
            "foss2022a-serial",
            "global",
            "global_generic",
        ]
    )


def test_create_log_file_name():
    """Test that the log file names are created correctly."""
    create_log_file_name = mod.create_log_file_name
    mpsd_release = "dev-23a"
    microarch = mod.get_native_microarchitecture()
    date = datetime.datetime.now().replace(microsecond=0).isoformat()
    action = "install"
    package_set = "foss2021a"
    # test build_log_file_name  generation
    build_log_file_name = create_log_file_name(
        mpsd_release=mpsd_release,
        date=date,
        action=action,
        package_set=package_set,
    )
    assert (
        build_log_file_name
        == f"{mpsd_release}_{microarch}_{date}_BUILD_{package_set}_{action}.log"
    )
    installer_log_file_name = create_log_file_name(
        mpsd_release=mpsd_release,
        date=date,
        action=action,
    )
    assert (
        installer_log_file_name
        == f"{mpsd_release}_{microarch}_{date}_APEX_{action}.log"
    )
    # test no build log file for incorrect action
    build_log_file_name = create_log_file_name(
        mpsd_release=mpsd_release,
        date=date,
        action="status",
        package_set=package_set,
    )
    assert build_log_file_name is None


def create_fake_environment(tmp_path, mpsd_release, expected_toolchain_map=None):
    """Create a fake environment with toolchains for testing."""
    if not expected_toolchain_map:
        test_microarch = mod.get_native_microarchitecture()
        expected_toolchain_map = {test_microarch: ["foss2021a", "intel2021a"]}

    for microarch in expected_toolchain_map.keys():
        toolchain_lmod_folder = (
            tmp_path / mpsd_release / microarch / "lmod" / "Core" / "toolchains"
        )
        toolchain_lmod_folder.mkdir(parents=True, exist_ok=True)
        spack_folder = tmp_path / mpsd_release / microarch / "spack"
        spack_folder.mkdir(parents=True, exist_ok=True)
        logs_folder = tmp_path / mpsd_release / microarch / "logs"
        logs_folder.mkdir(parents=True, exist_ok=True)
        for toolchain in expected_toolchain_map[microarch]:
            toolchain_lua_file = toolchain_lmod_folder / f"{toolchain}.lua"
            toolchain_lua_file.touch()

    return expected_toolchain_map


def test_environment_status(tmp_path):
    """Test that the environment status is correct."""
    toolchain_map = mod.environment_status("fake-release", tmp_path)
    assert toolchain_map is None
    mpsd_release = "dev-23a"
    expected_toolchain_map = create_fake_environment(tmp_path, mpsd_release)
    # check that the environment statuxis is correct
    toolchain_map = mod.environment_status(mpsd_release, tmp_path)
    # convert each list to a set to ensure that the order doesn't matter
    for microarch in expected_toolchain_map.keys():
        assert set(toolchain_map[microarch]) == set(expected_toolchain_map[microarch])


@pytest.mark.skip(reason="not implemented yet")
def test_remove_environment(tmp_path):
    """Test that the remove_environment works as expected."""
    mpsd_release = "dev-23a"
    # create a fake environment
    create_fake_environment(tmp_path, mpsd_release)
    # check that the environment status is correct
    toolchain_map = mod.environment_status(mpsd_release, tmp_path)
    assert toolchain_map is not None

    # test removal  without arguments (should sys.exit(1))
    create_fake_environment(tmp_path, mpsd_release)
    with pytest.raises(SystemExit):
        mod.remove_environment(mpsd_release, tmp_path, force_remove=True)

    # test removal of the complete environment
    mod.remove_environment(mpsd_release, tmp_path, ["ALL"], force_remove=True)
    toolchain_map = mod.environment_status(mpsd_release, tmp_path)
    assert toolchain_map is None
    # ensure that logs folder remains
    logs_folder = tmp_path / mpsd_release / "logs"
    assert logs_folder.exists()

    # test removal of a single toolchain
    # done in test_install_environment_zlib


def test_initialise_environment(tmp_path):
    """Test that init_file is created as expected."""
    # test that the init file is created as expected
    mod.initialise_environment(tmp_path)
    init_file = tmp_path / mod.config_vars["init_file"]

    assert init_file.exists()
    # ensure "Initialising MPSD software ..." is in the log file
    log_file = tmp_path / mod.config_vars["cmd_log_file"]
    with open(log_file, "r") as f:
        assert (f"Initialising MPSD software instance at {tmp_path}") in f.read()

    # test that calling again results in warning and exit code 30
    with pytest.raises(SystemExit) as pytest_wrapped_e:
        mod.initialise_environment(tmp_path)
    assert pytest_wrapped_e.type == SystemExit
    assert pytest_wrapped_e.value.code == 30


def test_get_root_dir(tmp_path):
    """Test that the root directory is correct."""
    with mod.os_chdir(tmp_path):
        # test that  function exists with error 40 if root dir doesn't exist
        with pytest.raises(SystemExit) as pytest_wrapped_e:
            mod.get_root_dir()
        assert pytest_wrapped_e.type == SystemExit
        assert pytest_wrapped_e.value.code == 40

        # test that initialize_environment creates the root dir
        mod.initialise_environment(tmp_path)
        root_dir = mod.get_root_dir()
        assert root_dir == tmp_path

        # test that root_dir from parent is detected correctly
        sub_dir = tmp_path / "sub_dir"
        sub_dir.mkdir()
        with mod.os_chdir(sub_dir):
            root_dir = mod.get_root_dir()
            assert root_dir == tmp_path

        # test that initialising in a subdirectory makes it the root dir
        with mod.os_chdir(sub_dir):
            mod.initialise_environment(sub_dir)
            root_dir = mod.get_root_dir()
            assert root_dir == sub_dir


def test_get_available_releases():
    res = mod.get_available_releases()
    assert "dev-23a" in res
    assert len(res) >= 1
    for release in res:
        assert isinstance(release, str)


def test_argument_parsing_logic(mocker):
    """Test to find errors in argparse logic.

    Strategy:

    In each of the tests below, we are setting the sys.argv to simulate the
    input from the command line, and in each instance, we ensure that the
    mocked function get the arguments as expected. The function is mocked not
    to carry out any activity.

    """

    # pretend we have a rootdir defined
    mock = mocker.patch(
        "mpsd_software_manager.mpsd_software.get_root_dir", return_value=Path(".")
    )

    sys.argv = ["mpsd-software-tests", "init"]
    mock = mocker.patch(
        "mpsd_software_manager.mpsd_software.initialise_environment", return_value=None
    )
    with pytest.raises(SystemExit):
        mod.main()
    call_argument = mock.call_args[0][0]
    assert isinstance(call_argument, Path)

    ### available
    sys.argv = ["mpsd-software-tests", "available"]
    mock = mocker.patch(
        "mpsd_software_manager.mpsd_software.get_available_releases", return_value=None
    )
    with pytest.raises(SystemExit):
        mod.main()

    sys.argv = ["mpsd-software-tests", "available", "dev-23a"]
    mock = mocker.patch(
        "mpsd_software_manager.mpsd_software.get_available_package_sets",
        return_value=None,
    )
    mod.main()
    call_argument = mock.call_args[0][0]
    assert call_argument == "dev-23a"

    ### prepare
    sys.argv = ["mpsd-software-tests", "prepare", "dev-23a"]
    mock = mocker.patch(
        "mpsd_software_manager.mpsd_software.prepare_environment", return_value=None
    )
    mod.main()
    call_argument = mock.call_args[0][0]
    assert call_argument == "dev-23a"

    ### install
    mock = mocker.patch(
        "mpsd_software_manager.mpsd_software.install_environment", return_value=None
    )
    sys.argv = ["mpsd-software-tests", "install", "dev-23a", "foss2022a-mpi"]
    mod.main()
    assert mock.call_args[0][0] == "dev-23a"
    assert mock.call_args[0][1] == ["foss2022a-mpi"]

    sys.argv = [
        "mpsd-software-tests",
        "install",
        "23b",
        "foss2022a-mpi",
        "foss2022a-serial",
    ]
    mod.main()
    assert mock.call_args[0][0] == "23b"
    assert mock.call_args[0][1] == ["foss2022a-mpi", "foss2022a-serial"]

    ### status
    mock = mocker.patch(
        "mpsd_software_manager.mpsd_software.environment_status", return_value=None
    )
    sys.argv = ["mpsd-software-tests", "status", "dev-23a"]
    mod.main()
    assert mock.call_args[0][0] == "dev-23a"

    ### remove (argparse doesn't allow this yet.
    ### Copy from 'install' when the time has come.)


def test_interface(tmp_path):
    """Test other things (not implemented yet)."""
    pass
    # ensure that installing without package_sets only passes the available package_sets
    # check that the script branch and hash are correct when running the script
    # check that the help message is printed when no arguments are provided
    # check that the help message is printed when -h is provided
    # check that the error messages are also logged to the log file
    # check that `/` in release is handled correctly
    # check that the cmd_log file contains sys arguments
    # check that the cmd_log file contains the script version for init
    # check that the cmd_log file contains the location of APEX log


# other tests to add (ideally)
# - get_native_microarchitecture()