exportjobinfo.py 9.88 KB
Newer Older
1
2
3
4
5
6
7
8
import subprocess
import csv
import re
import time
from datetime import datetime
from io import StringIO
import requests
from requests.auth import HTTPBasicAuth
9
10
from db.influx import common
from db.influx.metrics import MType
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import conf.config as conf
import conf.influxdb as confdb

_info = {
    "user_name":        {"rename": "user_name",
                         "type": MType.STR,
                         "lsf": "USER",
                         "slurm": "User"},
    "used_queue":       {"rename": "used_queue",
                         "type": MType.STR,
                         "lsf": "QUEUE",
                         "slurm": "Partition"},
    "submit_time":      {"rename": "submit_time",
                         "type": MType.INT,
                         "lsf": "SUBMIT_TIME",
                         "slurm": "Submit"},
    "start_time":       {"rename": "start_time",
                         "type": MType.INT,
                         "lsf": "START_TIME",
                         "slurm": "Start"},
    "end_time":         {"rename": "end_time",
                         "type": MType.INT,
                         "lsf": "FINISH_TIME",
                         "slurm": "End"},
    "run_time":         {"rename": "run_time",
                         "type": MType.INT,
                         "lsf": "RUN_TIME",
                         "slurm": "Elapsed",
                         "regex": "([0-9]+)"},
    "requested_time":   {"rename": "requested_time",
                         "type": MType.INT,
                         "lsf": "RUNTIMELIMIT",
                         "slurm": "Timelimit",
                         "regex": r"([0-9]+\.[0-9]+)"},
    "requested_cu":     {"rename": "requested_cu",
                         "type": MType.INT,
                         "lsf": "SLOTS",
                         "slurm": "NCPUS"},
    "num_nodes":        {"rename": "num_nodes",
                         "type": MType.INT,
                         "lsf": "NEXEC_HOST",
                         "slurm": "NNodes"},
    "alloc_slots":      {"rename": "alloc_cu",
                         "type": MType.STR,
                         "lsf": "ALLOC_SLOT",
                         "slurm": "NodeList"},
}


def get_by_regex(v, regex):
    m = re.search(regex, v)

    res = None
    if m is not None:
        res = m.group(1)

    return res


def date_to_timestamp(v, format):
    if v is None:
        return None

    raw_r = datetime.strptime(v, format)
    dt = raw_r.replace(year=datetime.today().year).timetuple()
    ts = time.mktime(dt)

    return ts


def parse_job_info_lsf(raw_data):
    time_fields = ["submit_time", "start_time", "end_time"]
    reg_fields = {
        "run_time": "([0-9]+)",
        "requested_time": r"([0-9]+\.[0-9]+)",
    }

    job_info = {}
    for m, par in _info.items():
        if par["lsf"] not in raw_data:
            raise ValueError("Couln't parse value", par["lsf"])
        job_info[m] = raw_data[par["lsf"]]

    # prepare raw data

    # dates to timestamp
    for f in time_fields:
        d = get_by_regex(job_info[f], "([A-Za-z]+ +[0-9]+ [0-9:]+)")
        job_info[f] = date_to_timestamp(d, "%b %d %H:%M")

    # regexes
    for f, r in reg_fields.items():
        job_info[f] = get_by_regex(job_info[f], r)

akhuziy's avatar
akhuziy committed
105
106
107
108
109
110
111
    # reformat alloc info

    alloc_info = {}

    nodes_raw = raw_data[job_info["alloc_slots"]].split(":")
    for n in nodes_raw:
        cu, node_id = n.split("*")
112
        alloc_info[node_id] = {"cpu": int(cu), "mem": 0}
akhuziy's avatar
akhuziy committed
113

114
115
116
117
118
119
120
121
122
123
    # format values
    for m, par in _info.items():
        job_info[m] = common.format_metric(_info, m, job_info[m])

    # reevaluation

    # requested time from min to sec
    if job_info["requested_time"] is not None:
        job_info["requested_time"] *= 60

akhuziy's avatar
akhuziy committed
124
    return job_info, alloc_info
125
126
127
128
129
130


def slurm_time_to_sec(slurm_time):
    if slurm_time is None:
        return None

131
    # 2:days, 4:hours, 5:minutes, 6:seconds
132
    m = re.search(
133
        "(([0-9]{1,2})-)?(([0-9]{2}):)?([0-9]{2}):([0-9]{2})", slurm_time)
134

135
    if m.group(5) is None or m.group(6) is None:
136
137
        raise ValueError("Couldn't parse time", slurm_time)

138
    secs = int(m.group(5)) * 60 + int(m.group(6))
139

140
141
    if m.group(4) is not None:
        secs += int(m.group(4)) * 3600
142

143
144
    if m.group(2) is not None:
        secs += int(m.group(2)) * 86400
145
146
147
148

    return secs


akhuziy's avatar
akhuziy committed
149
def slurm_format_alloc(nodes_info):
150
    formatted = ":".join("{}*{}*{}".format(nodes_info[n]["cpu"], nodes_info[n]["mem"], n) for n in nodes_info)
akhuziy's avatar
akhuziy committed
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182

    return formatted

def slurm_parse_nodes_info(nodes):
    slurm_list_set = [",","[","-"]
    is_list = False
    for c in slurm_list_set:
        if c in nodes:
            is_list = True
            break

    if is_list:
        tolist_comm = "scontrol show hostname {:s}".format(nodes)
        result = subprocess.run(tolist_comm, stdout=subprocess.PIPE, shell=True,
                                executable='/bin/bash')
        out = result.stdout.decode("utf-8")
        out = out.splitlines()
    else:
        out = [nodes]

    ret = out

    return ret

def slurm_parse_cpu_info(cpus):
    cpus_list = cpus.split(",")
    cpu_count = 0
    for cpu_info in cpus_list:
        if "-" in cpu_info:
            a_and_b = cpu_info.split("-")
            cpu_count += int(a_and_b[1]) - int(a_and_b[0]) + 1
        else:
183
            cpu_count += 1
akhuziy's avatar
akhuziy committed
184
185
186
187
188
189

    return cpu_count


def slurm_fetch_alloc_info(job_id):
    fetch_comm = "scontrol show job -d {:s}".format(job_id)
190
191
192
193
    result = subprocess.run(fetch_comm, stdout=subprocess.PIPE, shell=True,
                            executable='/bin/bash')
    out = result.stdout.decode("utf-8")

194
    alloc_raw = re.findall("Nodes=([^ ]+) +CPU_IDs=([^ ]+) +Mem=([ 0-9]+)", out)
akhuziy's avatar
akhuziy committed
195
    allocs = {}
196

akhuziy's avatar
akhuziy committed
197
198
    if len(alloc_raw) == 0:
        raise ValueError("Cannot parse Slurm allocation info", out)
199

200
    for (nodes_list_raw, cpu_list_raw, mem_alloc_raw) in alloc_raw:
akhuziy's avatar
akhuziy committed
201
202
        nodes_list = slurm_parse_nodes_info(nodes_list_raw)
        cpu_count = slurm_parse_cpu_info(cpu_list_raw)
203
204
205
206
        if re.match(r"[0-9]+", mem_alloc_raw):
            mem_alloc = int(mem_alloc_raw) 
        else:
            mem_alloc = ""
207

akhuziy's avatar
akhuziy committed
208
        for node_name in nodes_list:
209
            allocs[node_name] = {"cpu": cpu_count, "mem": mem_alloc}
210

akhuziy's avatar
akhuziy committed
211
    return allocs
212

akhuziy's avatar
akhuziy committed
213
def parse_job_info_slurm(raw_data, alloc_info):
214
215
216
217
218
219
220
221
222
223
224
225
226
    time_fields = ["submit_time", "start_time", "end_time"]
    dur_fields = ["run_time", "requested_time"]

    job_info = {}
    for m, par in _info.items():
        if par["slurm"] not in raw_data:
            raise ValueError("Couln't parse value", par["slurm"])
        job_info[m] = raw_data[par["slurm"]]

    # prepare raw data

    # dates to timestamp
    for f in time_fields:
227
228
229
230
        if f == "end_time" and job_info[f] == "Unknown":
          job_info[f] = 0 
        else:
          job_info[f] = date_to_timestamp(job_info[f], "%Y-%m-%dT%H:%M:%S")
231
232
233
234
235
236

    # slurm times to seconds
    for f in dur_fields:
        job_info[f] = slurm_time_to_sec(job_info[f])

    # reformat cu allocation
akhuziy's avatar
akhuziy committed
237
    job_info["alloc_slots"] = slurm_format_alloc(alloc_info)
238
239
240
241
242
243
244
245
246
247
248
249
250
251

    # format values
    for m, par in _info.items():
        job_info[m] = common.format_metric(_info, m, job_info[m])

    return job_info


def format_fields(m, val, metrics):
    if metrics[m]["type"] == MType.STR:
        val = "\"{:s}\"".format(val)
    return "{}={}".format(_info[m]["rename"], val)


akhuziy's avatar
akhuziy committed
252
253
254
255
def format_influxdb_alloc(job_id, alloc_info, cur_time):
    mname = conf.job_info["measurement_name"] + "-alloc"

    req = ""
256
257
258
    for node_name, alloc_data in alloc_info.items():
        alloc_cu = alloc_data["cpu"]
        alloc_mem = alloc_data["mem"]
259
        req += "{:s},jobid={:s},host={:s} ".format(mname, job_id, node_name)
akhuziy's avatar
akhuziy committed
260

261
262
        req += "alloc_cu={},".format(alloc_cu)
        req += "alloc_mem={}".format(alloc_mem)
akhuziy's avatar
akhuziy committed
263

akhuziy's avatar
bug fix    
akhuziy committed
264
        req += " {:d}\n".format(cur_time)
akhuziy's avatar
akhuziy committed
265
266
267
268
269

    return req

def format_influx_db_out(job_id, job_data, alloc_data):
    cur_time = int(time.time())
270
271
272
273
    mname = conf.job_info["measurement_name"]

    req = "{:s},jobid={:s} ".format(mname, job_id)

akhuziy's avatar
akhuziy committed
274
    req += ",".join(format_fields(k, v, _info) for k, v in job_data.items())
275

akhuziy's avatar
bug fix    
akhuziy committed
276
    req += " {:d}\n".format(cur_time)
277

akhuziy's avatar
akhuziy committed
278
279
280
    alloc_influx = format_influxdb_alloc(job_id, alloc_data, cur_time)

    req += alloc_influx
281

akhuziy's avatar
akhuziy committed
282
    return req
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315

def format_format_str(batch, qdelim):
    oformat = qdelim.join(val[batch] for key, val in _info.items())

    if conf.BATCH_SYSTEM == "LSF":
        oformat += " delimiter=','"

    return oformat


def fetch_job_data_general(job_id, batch, qdelim, rdelim):
    fetch_comm_temp = conf.job_info["fetch_job_info_{:s}".format(batch)]

    oformat = format_format_str(batch, qdelim)

    fetch_comm = fetch_comm_temp.format(oformat, job_id)
    result = subprocess.run(fetch_comm, stdout=subprocess.PIPE, shell=True,
                            executable='/bin/bash')

    out = result.stdout.decode("utf-8")

    f = StringIO(out)
    reader = csv.DictReader(f, delimiter=rdelim)

    raw_data = next(reader)

    return raw_data


def fetch_job_data(job_id):
    batchsys = conf.BATCH_SYSTEM
    if batchsys == "LSF":
        raw_info = fetch_job_data_general(job_id, "lsf", " ", ",")
akhuziy's avatar
akhuziy committed
316
        job_info, node_info = parse_job_info_lsf(raw_info)
317
318
    elif batchsys == "SLURM":
        raw_info = fetch_job_data_general(job_id, "slurm", ",", "|")
akhuziy's avatar
akhuziy committed
319
320
        node_info = slurm_fetch_alloc_info(job_id)
        job_info = parse_job_info_slurm(raw_info, node_info)
321
322
323
    else:
        raise ValueError("Configured batch system is not supported")

akhuziy's avatar
akhuziy committed
324
    return job_info, node_info
325
326

def export_job_info(job_id):
akhuziy's avatar
akhuziy committed
327
    job_info, alloc_info = fetch_job_data(job_id)
328

akhuziy's avatar
akhuziy committed
329
    fdata = format_influx_db_out(job_id, job_info, alloc_info)
330
    payload = {'db': confdb.IDB["database"], 'precision': 's'}
331
    
332
333
334
335
336
337
    write_url = "{:s}/write".format(confdb.IDB["api_url"])
    r = requests.post(write_url, auth=HTTPBasicAuth(
        confdb.IDB["username"], confdb.IDB["password"]),
        params=payload, data=fdata)

    if r.status_code != 204:
akhuziy's avatar
bug fix    
akhuziy committed
338
        raise ValueError("Couldn't write to InfluxDB", r.status_code, r.content)
339
340

    return True