exportjobinfo.py 9.42 KB
Newer Older
1
2
3
4
5
6
7
8
import subprocess
import csv
import re
import time
from datetime import datetime
from io import StringIO
import requests
from requests.auth import HTTPBasicAuth
9
10
from db.influx import common
from db.influx.metrics import MType
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import conf.config as conf
import conf.influxdb as confdb

_info = {
    "user_name":        {"rename": "user_name",
                         "type": MType.STR,
                         "lsf": "USER",
                         "slurm": "User"},
    "used_queue":       {"rename": "used_queue",
                         "type": MType.STR,
                         "lsf": "QUEUE",
                         "slurm": "Partition"},
    "submit_time":      {"rename": "submit_time",
                         "type": MType.INT,
                         "lsf": "SUBMIT_TIME",
                         "slurm": "Submit"},
    "start_time":       {"rename": "start_time",
                         "type": MType.INT,
                         "lsf": "START_TIME",
                         "slurm": "Start"},
    "end_time":         {"rename": "end_time",
                         "type": MType.INT,
                         "lsf": "FINISH_TIME",
                         "slurm": "End"},
    "run_time":         {"rename": "run_time",
                         "type": MType.INT,
                         "lsf": "RUN_TIME",
                         "slurm": "Elapsed",
                         "regex": "([0-9]+)"},
    "requested_time":   {"rename": "requested_time",
                         "type": MType.INT,
                         "lsf": "RUNTIMELIMIT",
                         "slurm": "Timelimit",
                         "regex": r"([0-9]+\.[0-9]+)"},
    "requested_cu":     {"rename": "requested_cu",
                         "type": MType.INT,
                         "lsf": "SLOTS",
                         "slurm": "NCPUS"},
    "num_nodes":        {"rename": "num_nodes",
                         "type": MType.INT,
                         "lsf": "NEXEC_HOST",
                         "slurm": "NNodes"},
    "alloc_slots":      {"rename": "alloc_cu",
                         "type": MType.STR,
                         "lsf": "ALLOC_SLOT",
                         "slurm": "NodeList"},
}


def get_by_regex(v, regex):
    m = re.search(regex, v)

    res = None
    if m is not None:
        res = m.group(1)

    return res


def date_to_timestamp(v, format):
    if v is None:
        return None

    raw_r = datetime.strptime(v, format)
    dt = raw_r.replace(year=datetime.today().year).timetuple()
    ts = time.mktime(dt)

    return ts


def parse_job_info_lsf(raw_data):
    time_fields = ["submit_time", "start_time", "end_time"]
    reg_fields = {
        "run_time": "([0-9]+)",
        "requested_time": r"([0-9]+\.[0-9]+)",
    }

    job_info = {}
    for m, par in _info.items():
        if par["lsf"] not in raw_data:
            raise ValueError("Couln't parse value", par["lsf"])
        job_info[m] = raw_data[par["lsf"]]

    # prepare raw data

    # dates to timestamp
    for f in time_fields:
        d = get_by_regex(job_info[f], "([A-Za-z]+ +[0-9]+ [0-9:]+)")
        job_info[f] = date_to_timestamp(d, "%b %d %H:%M")

    # regexes
    for f, r in reg_fields.items():
        job_info[f] = get_by_regex(job_info[f], r)

akhuziy's avatar
akhuziy committed
105
106
107
108
109
110
111
112
113
    # reformat alloc info

    alloc_info = {}

    nodes_raw = raw_data[job_info["alloc_slots"]].split(":")
    for n in nodes_raw:
        cu, node_id = n.split("*")
        alloc_info[node_id] = int(cu)

114
115
116
117
118
119
120
121
122
123
    # format values
    for m, par in _info.items():
        job_info[m] = common.format_metric(_info, m, job_info[m])

    # reevaluation

    # requested time from min to sec
    if job_info["requested_time"] is not None:
        job_info["requested_time"] *= 60

akhuziy's avatar
akhuziy committed
124
    return job_info, alloc_info
125
126
127
128
129
130


def slurm_time_to_sec(slurm_time):
    if slurm_time is None:
        return None

131
    # 2:days, 4:hours, 5:minutes, 6:seconds
132
    m = re.search(
133
        "(([0-9]{1,2})-)?(([0-9]{2}):)?([0-9]{2}):([0-9]{2})", slurm_time)
134

135
    if m.group(5) is None or m.group(6) is None:
136
137
        raise ValueError("Couldn't parse time", slurm_time)

138
    secs = int(m.group(5)) * 60 + int(m.group(6))
139

140
141
    if m.group(4) is not None:
        secs += int(m.group(4)) * 3600
142

143
144
    if m.group(2) is not None:
        secs += int(m.group(2)) * 86400
145
146
147
148

    return secs


akhuziy's avatar
akhuziy committed
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
def slurm_format_alloc(nodes_info):
    formatted = ":".join("{}*{}".format(nodes_info[n], n) for n in nodes_info)

    return formatted

def slurm_parse_nodes_info(nodes):
    slurm_list_set = [",","[","-"]
    is_list = False
    for c in slurm_list_set:
        if c in nodes:
            is_list = True
            break

    if is_list:
        tolist_comm = "scontrol show hostname {:s}".format(nodes)
        result = subprocess.run(tolist_comm, stdout=subprocess.PIPE, shell=True,
                                executable='/bin/bash')
        out = result.stdout.decode("utf-8")
        out = out.splitlines()
    else:
        out = [nodes]

    ret = out

    return ret

def slurm_parse_cpu_info(cpus):
    cpus_list = cpus.split(",")
    cpu_count = 0
    for cpu_info in cpus_list:
        if "-" in cpu_info:
            a_and_b = cpu_info.split("-")
            cpu_count += int(a_and_b[1]) - int(a_and_b[0]) + 1
        else:
183
            cpu_count += 1
akhuziy's avatar
akhuziy committed
184
185
186
187
188
189

    return cpu_count


def slurm_fetch_alloc_info(job_id):
    fetch_comm = "scontrol show job -d {:s}".format(job_id)
190
191
192
193
    result = subprocess.run(fetch_comm, stdout=subprocess.PIPE, shell=True,
                            executable='/bin/bash')
    out = result.stdout.decode("utf-8")

akhuziy's avatar
akhuziy committed
194
195
    alloc_raw = re.findall("Nodes=([^ ]+) +CPU_IDs=([^ ]+)", out)
    allocs = {}
196

akhuziy's avatar
akhuziy committed
197
198
    if len(alloc_raw) == 0:
        raise ValueError("Cannot parse Slurm allocation info", out)
199

akhuziy's avatar
akhuziy committed
200
201
202
    for (nodes_list_raw, cpu_list_raw) in alloc_raw:
        nodes_list = slurm_parse_nodes_info(nodes_list_raw)
        cpu_count = slurm_parse_cpu_info(cpu_list_raw)
203

akhuziy's avatar
akhuziy committed
204
205
        for node_name in nodes_list:
            allocs[node_name] = cpu_count
206

akhuziy's avatar
akhuziy committed
207
    return allocs
208

akhuziy's avatar
akhuziy committed
209
def parse_job_info_slurm(raw_data, alloc_info):
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
    time_fields = ["submit_time", "start_time", "end_time"]
    dur_fields = ["run_time", "requested_time"]

    job_info = {}
    for m, par in _info.items():
        if par["slurm"] not in raw_data:
            raise ValueError("Couln't parse value", par["slurm"])
        job_info[m] = raw_data[par["slurm"]]

    # prepare raw data

    # dates to timestamp
    for f in time_fields:
        job_info[f] = date_to_timestamp(job_info[f], "%Y-%m-%dT%H:%M:%S")

    # slurm times to seconds
    for f in dur_fields:
        job_info[f] = slurm_time_to_sec(job_info[f])

    # reformat cu allocation
akhuziy's avatar
akhuziy committed
230
    job_info["alloc_slots"] = slurm_format_alloc(alloc_info)
231
232
233
234
235
236
237
238
239
240
241
242
243
244

    # format values
    for m, par in _info.items():
        job_info[m] = common.format_metric(_info, m, job_info[m])

    return job_info


def format_fields(m, val, metrics):
    if metrics[m]["type"] == MType.STR:
        val = "\"{:s}\"".format(val)
    return "{}={}".format(_info[m]["rename"], val)


akhuziy's avatar
akhuziy committed
245
246
247
248
249
def format_influxdb_alloc(job_id, alloc_info, cur_time):
    mname = conf.job_info["measurement_name"] + "-alloc"

    req = ""
    for node_name, alloc_cu in alloc_info.items():
250
        req += "{:s},jobid={:s},host={:s} ".format(mname, job_id, node_name)
akhuziy's avatar
akhuziy committed
251
252
253

        req += "alloc_cu={}".format(alloc_cu)

akhuziy's avatar
bug fix    
akhuziy committed
254
        req += " {:d}\n".format(cur_time)
akhuziy's avatar
akhuziy committed
255
256
257
258
259

    return req

def format_influx_db_out(job_id, job_data, alloc_data):
    cur_time = int(time.time())
260
261
262
263
    mname = conf.job_info["measurement_name"]

    req = "{:s},jobid={:s} ".format(mname, job_id)

akhuziy's avatar
akhuziy committed
264
    req += ",".join(format_fields(k, v, _info) for k, v in job_data.items())
265

akhuziy's avatar
bug fix    
akhuziy committed
266
    req += " {:d}\n".format(cur_time)
267

akhuziy's avatar
akhuziy committed
268
269
270
    alloc_influx = format_influxdb_alloc(job_id, alloc_data, cur_time)

    req += alloc_influx
271

akhuziy's avatar
akhuziy committed
272
    return req
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305

def format_format_str(batch, qdelim):
    oformat = qdelim.join(val[batch] for key, val in _info.items())

    if conf.BATCH_SYSTEM == "LSF":
        oformat += " delimiter=','"

    return oformat


def fetch_job_data_general(job_id, batch, qdelim, rdelim):
    fetch_comm_temp = conf.job_info["fetch_job_info_{:s}".format(batch)]

    oformat = format_format_str(batch, qdelim)

    fetch_comm = fetch_comm_temp.format(oformat, job_id)
    result = subprocess.run(fetch_comm, stdout=subprocess.PIPE, shell=True,
                            executable='/bin/bash')

    out = result.stdout.decode("utf-8")

    f = StringIO(out)
    reader = csv.DictReader(f, delimiter=rdelim)

    raw_data = next(reader)

    return raw_data


def fetch_job_data(job_id):
    batchsys = conf.BATCH_SYSTEM
    if batchsys == "LSF":
        raw_info = fetch_job_data_general(job_id, "lsf", " ", ",")
akhuziy's avatar
akhuziy committed
306
        job_info, node_info = parse_job_info_lsf(raw_info)
307
308
    elif batchsys == "SLURM":
        raw_info = fetch_job_data_general(job_id, "slurm", ",", "|")
akhuziy's avatar
akhuziy committed
309
310
        node_info = slurm_fetch_alloc_info(job_id)
        job_info = parse_job_info_slurm(raw_info, node_info)
311
312
313
    else:
        raise ValueError("Configured batch system is not supported")

akhuziy's avatar
akhuziy committed
314
    return job_info, node_info
315
316

def export_job_info(job_id):
akhuziy's avatar
akhuziy committed
317
    job_info, alloc_info = fetch_job_data(job_id)
318

akhuziy's avatar
akhuziy committed
319
    fdata = format_influx_db_out(job_id, job_info, alloc_info)
320
    payload = {'db': confdb.IDB["database"], 'precision': 's'}
321

322
323
324
325
326
327
    write_url = "{:s}/write".format(confdb.IDB["api_url"])
    r = requests.post(write_url, auth=HTTPBasicAuth(
        confdb.IDB["username"], confdb.IDB["password"]),
        params=payload, data=fdata)

    if r.status_code != 204:
akhuziy's avatar
bug fix    
akhuziy committed
328
        raise ValueError("Couldn't write to InfluxDB", r.status_code, r.content)
329
330

    return True