influxdb_fetchjob.py 2.37 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# db/getjob.py gets job information from the file created by postexec script
# or directly from the job system

import conf.config as conf
from . import common
from . import metrics

m_jobs = conf.measurements["jobs"]

def get_job_query(job_id):
    query = (
        'SELECT last(*) FROM "{:s}" '
        'WHERE "jobid" = \'{:s}\';'
    ).format(m_jobs, job_id)

    return query

def parse_job_response(data=None):
    job = {}

    if len(data) == 0:
        raise RuntimeError("Job query returned empty response")
    if "series" not in data[0]:
        raise RuntimeError("Job query was not executed correctly", data[0])
    if len(data[0]['series']) == 0:
        raise RuntimeError("Job was not found", data[0])

    for i in range(0, len(data[0]['series'][0]['columns'])):
        metric = data[0]['series'][0]['columns'][i]
        job[metric] = data[0]['series'][0]['values'][0][i]

    return job

34
35
36
def parse_alloc_res(alloc_info):
    alloc_cu = {}
    alloc_mem = {}
37
38
39

    nodes_raw = alloc_info.split(":")
    for n in nodes_raw:
40
41
42
43
44
45
46
47
48
49
50
        resources = n.split("*")
        nres = len(resources)
        if nres == 3:
            cu = resources[0]
            mem = resources[1]
            node_id = resources[2]
            fmem = common.format_metric(metrics.metrics_node, "alloc_mem", mem)
            alloc_mem[node_id] = fmem
        elif nres == 2:
            cu = resources[0]
            node_id = resources[1]
51
            alloc_mem[node_id] = -1
52
53
        else:
            raise RuntimeError("Allocated resourses couldn't be parsed")
54

55
        fcu = common.format_metric(metrics.metrics_node, "alloc_cu", cu)
56
        alloc_cu[node_id] = fcu
57

58
    return alloc_cu, alloc_mem
59
60
61
62
63
64

def format_job_info(jobinfo_init):
    jobinfo = {}

    for name, par in metrics.metrics_job.items():
        raw_value = jobinfo_init.get("last_{:s}".format(par["dbname"]), None)
65
66
67
68

        if name == "exit_code" and raw_value is None:
            raw_value = -1

69
        value = common.format_metric(metrics.metrics_job, name, raw_value)
70
        jobinfo[name] = value
71
72
73
74
75
76
77
78
79
80
81
82
83

    return jobinfo

def get_job_data(job_id):
    query = get_job_query(job_id)

    jobdata_raw = common.fetch_data(query)

    jobdata_parsed = parse_job_response(jobdata_raw)

    jobinfo = format_job_info(jobdata_parsed)

    # Special case: Allocated CU
84
    alloc_cu, alloc_mem = parse_alloc_res(jobinfo["alloc_cu"])
85

86
    return jobinfo, alloc_cu, alloc_mem