exportjobinfo.py 9.45 KB
Newer Older
1
2
3
4
5
6
7
8
import subprocess
import csv
import re
import time
from datetime import datetime
from io import StringIO
import requests
from requests.auth import HTTPBasicAuth
9
10
from db.influx import common
from db.influx.metrics import MType
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import conf.config as conf
import conf.influxdb as confdb

_info = {
    "user_name":        {"rename": "user_name",
                         "type": MType.STR,
                         "lsf": "USER",
                         "slurm": "User"},
    "used_queue":       {"rename": "used_queue",
                         "type": MType.STR,
                         "lsf": "QUEUE",
                         "slurm": "Partition"},
    "submit_time":      {"rename": "submit_time",
                         "type": MType.INT,
                         "lsf": "SUBMIT_TIME",
                         "slurm": "Submit"},
    "start_time":       {"rename": "start_time",
                         "type": MType.INT,
                         "lsf": "START_TIME",
                         "slurm": "Start"},
    "end_time":         {"rename": "end_time",
                         "type": MType.INT,
                         "lsf": "FINISH_TIME",
                         "slurm": "End"},
    "run_time":         {"rename": "run_time",
                         "type": MType.INT,
                         "lsf": "RUN_TIME",
                         "slurm": "Elapsed",
                         "regex": "([0-9]+)"},
    "requested_time":   {"rename": "requested_time",
                         "type": MType.INT,
                         "lsf": "RUNTIMELIMIT",
                         "slurm": "Timelimit",
                         "regex": r"([0-9]+\.[0-9]+)"},
    "requested_cu":     {"rename": "requested_cu",
                         "type": MType.INT,
                         "lsf": "SLOTS",
                         "slurm": "NCPUS"},
    "num_nodes":        {"rename": "num_nodes",
                         "type": MType.INT,
                         "lsf": "NEXEC_HOST",
                         "slurm": "NNodes"},
    "alloc_slots":      {"rename": "alloc_cu",
                         "type": MType.STR,
                         "lsf": "ALLOC_SLOT",
                         "slurm": "NodeList"},
}


def get_by_regex(v, regex):
    m = re.search(regex, v)

    res = None
    if m is not None:
        res = m.group(1)

    return res


def date_to_timestamp(v, format):
    if v is None:
        return None

    raw_r = datetime.strptime(v, format)
    dt = raw_r.replace(year=datetime.today().year).timetuple()
    ts = time.mktime(dt)

    return ts


def parse_job_info_lsf(raw_data):
    time_fields = ["submit_time", "start_time", "end_time"]
    reg_fields = {
        "run_time": "([0-9]+)",
        "requested_time": r"([0-9]+\.[0-9]+)",
    }

    job_info = {}
    for m, par in _info.items():
        if par["lsf"] not in raw_data:
            raise ValueError("Couln't parse value", par["lsf"])
        job_info[m] = raw_data[par["lsf"]]

    # prepare raw data

    # dates to timestamp
    for f in time_fields:
        d = get_by_regex(job_info[f], "([A-Za-z]+ +[0-9]+ [0-9:]+)")
        job_info[f] = date_to_timestamp(d, "%b %d %H:%M")

    # regexes
    for f, r in reg_fields.items():
        job_info[f] = get_by_regex(job_info[f], r)

akhuziy's avatar
akhuziy committed
105
106
107
108
109
110
111
112
113
    # reformat alloc info

    alloc_info = {}

    nodes_raw = raw_data[job_info["alloc_slots"]].split(":")
    for n in nodes_raw:
        cu, node_id = n.split("*")
        alloc_info[node_id] = int(cu)

114
115
116
117
118
119
120
121
122
123
    # format values
    for m, par in _info.items():
        job_info[m] = common.format_metric(_info, m, job_info[m])

    # reevaluation

    # requested time from min to sec
    if job_info["requested_time"] is not None:
        job_info["requested_time"] *= 60

akhuziy's avatar
akhuziy committed
124
    return job_info, alloc_info
125
126
127
128
129
130


def slurm_time_to_sec(slurm_time):
    if slurm_time is None:
        return None

131
    # 2:days, 4:hours, 5:minutes, 6:seconds
132
    m = re.search(
133
        "(([0-9]{1,2})-)?(([0-9]{2}):)?([0-9]{2}):([0-9]{2})", slurm_time)
134

135
    if m.group(5) is None or m.group(6) is None:
136
137
        raise ValueError("Couldn't parse time", slurm_time)

138
    secs = int(m.group(5)) * 60 + int(m.group(6))
139

140
141
    if m.group(4) is not None:
        secs += int(m.group(4)) * 3600
142

143
144
    if m.group(2) is not None:
        secs += int(m.group(2)) * 86400
145
146
147
148

    return secs


akhuziy's avatar
akhuziy committed
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
def slurm_format_alloc(nodes_info):
    formatted = ":".join("{}*{}".format(nodes_info[n], n) for n in nodes_info)

    return formatted

def slurm_parse_nodes_info(nodes):
    slurm_list_set = [",","[","-"]
    is_list = False
    for c in slurm_list_set:
        if c in nodes:
            is_list = True
            break

    if is_list:
        tolist_comm = "scontrol show hostname {:s}".format(nodes)
        result = subprocess.run(tolist_comm, stdout=subprocess.PIPE, shell=True,
                                executable='/bin/bash')
        out = result.stdout.decode("utf-8")
        out = out.splitlines()
    else:
        out = [nodes]

    ret = out

    return ret

def slurm_parse_cpu_info(cpus):
    cpus_list = cpus.split(",")
    cpu_count = 0
    for cpu_info in cpus_list:
        if "-" in cpu_info:
            a_and_b = cpu_info.split("-")
            cpu_count += int(a_and_b[1]) - int(a_and_b[0]) + 1
        else:
183
            cpu_count += 1
akhuziy's avatar
akhuziy committed
184
185
186
187
188
189

    return cpu_count


def slurm_fetch_alloc_info(job_id):
    fetch_comm = "scontrol show job -d {:s}".format(job_id)
190
191
192
193
    result = subprocess.run(fetch_comm, stdout=subprocess.PIPE, shell=True,
                            executable='/bin/bash')
    out = result.stdout.decode("utf-8")

akhuziy's avatar
akhuziy committed
194
195
    alloc_raw = re.findall("Nodes=([^ ]+) +CPU_IDs=([^ ]+)", out)
    allocs = {}
196

akhuziy's avatar
akhuziy committed
197
198
    if len(alloc_raw) == 0:
        raise ValueError("Cannot parse Slurm allocation info", out)
199

akhuziy's avatar
akhuziy committed
200
201
202
    for (nodes_list_raw, cpu_list_raw) in alloc_raw:
        nodes_list = slurm_parse_nodes_info(nodes_list_raw)
        cpu_count = slurm_parse_cpu_info(cpu_list_raw)
203

akhuziy's avatar
akhuziy committed
204
205
        for node_name in nodes_list:
            allocs[node_name] = cpu_count
206

akhuziy's avatar
akhuziy committed
207
    return allocs
208

akhuziy's avatar
akhuziy committed
209
def parse_job_info_slurm(raw_data, alloc_info):
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
    time_fields = ["submit_time", "start_time", "end_time"]
    dur_fields = ["run_time", "requested_time"]

    job_info = {}
    for m, par in _info.items():
        if par["slurm"] not in raw_data:
            raise ValueError("Couln't parse value", par["slurm"])
        job_info[m] = raw_data[par["slurm"]]

    # prepare raw data

    # dates to timestamp
    for f in time_fields:
        job_info[f] = date_to_timestamp(job_info[f], "%Y-%m-%dT%H:%M:%S")

    # slurm times to seconds
    for f in dur_fields:
        job_info[f] = slurm_time_to_sec(job_info[f])

    # reformat cu allocation
akhuziy's avatar
akhuziy committed
230
    job_info["alloc_slots"] = slurm_format_alloc(alloc_info)
231
232
233
234
235
236
237
238
239
240
241
242
243
244

    # format values
    for m, par in _info.items():
        job_info[m] = common.format_metric(_info, m, job_info[m])

    return job_info


def format_fields(m, val, metrics):
    if metrics[m]["type"] == MType.STR:
        val = "\"{:s}\"".format(val)
    return "{}={}".format(_info[m]["rename"], val)


akhuziy's avatar
akhuziy committed
245
246
247
248
249
250
251
def format_influxdb_alloc(job_id, alloc_info, cur_time):
    mname = conf.job_info["measurement_name"] + "-alloc"

    req = ""
    for node_name, alloc_cu in alloc_info.items():
        req += "{:s},jobid={:s} ".format(mname, job_id)

akhuziy's avatar
bug fix    
akhuziy committed
252
        req += "node_name=\"{}\",".format(node_name)
akhuziy's avatar
akhuziy committed
253
254
        req += "alloc_cu={}".format(alloc_cu)

akhuziy's avatar
bug fix    
akhuziy committed
255
        req += " {:d}\n".format(cur_time)
akhuziy's avatar
akhuziy committed
256
257
258
259
260

    return req

def format_influx_db_out(job_id, job_data, alloc_data):
    cur_time = int(time.time())
261
262
263
264
    mname = conf.job_info["measurement_name"]

    req = "{:s},jobid={:s} ".format(mname, job_id)

akhuziy's avatar
akhuziy committed
265
    req += ",".join(format_fields(k, v, _info) for k, v in job_data.items())
266

akhuziy's avatar
bug fix    
akhuziy committed
267
    req += " {:d}\n".format(cur_time)
268

akhuziy's avatar
akhuziy committed
269
270
271
    alloc_influx = format_influxdb_alloc(job_id, alloc_data, cur_time)

    req += alloc_influx
272

akhuziy's avatar
akhuziy committed
273
    return req
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306

def format_format_str(batch, qdelim):
    oformat = qdelim.join(val[batch] for key, val in _info.items())

    if conf.BATCH_SYSTEM == "LSF":
        oformat += " delimiter=','"

    return oformat


def fetch_job_data_general(job_id, batch, qdelim, rdelim):
    fetch_comm_temp = conf.job_info["fetch_job_info_{:s}".format(batch)]

    oformat = format_format_str(batch, qdelim)

    fetch_comm = fetch_comm_temp.format(oformat, job_id)
    result = subprocess.run(fetch_comm, stdout=subprocess.PIPE, shell=True,
                            executable='/bin/bash')

    out = result.stdout.decode("utf-8")

    f = StringIO(out)
    reader = csv.DictReader(f, delimiter=rdelim)

    raw_data = next(reader)

    return raw_data


def fetch_job_data(job_id):
    batchsys = conf.BATCH_SYSTEM
    if batchsys == "LSF":
        raw_info = fetch_job_data_general(job_id, "lsf", " ", ",")
akhuziy's avatar
akhuziy committed
307
        job_info, node_info = parse_job_info_lsf(raw_info)
308
309
    elif batchsys == "SLURM":
        raw_info = fetch_job_data_general(job_id, "slurm", ",", "|")
akhuziy's avatar
akhuziy committed
310
311
        node_info = slurm_fetch_alloc_info(job_id)
        job_info = parse_job_info_slurm(raw_info, node_info)
312
313
314
    else:
        raise ValueError("Configured batch system is not supported")

akhuziy's avatar
akhuziy committed
315
    return job_info, node_info
316
317

def export_job_info(job_id):
akhuziy's avatar
akhuziy committed
318
    job_info, alloc_info = fetch_job_data(job_id)
319

akhuziy's avatar
akhuziy committed
320
    fdata = format_influx_db_out(job_id, job_info, alloc_info)
321
    payload = {'db': confdb.IDB["database"], 'precision': 's'}
322

323
324
325
326
327
328
    write_url = "{:s}/write".format(confdb.IDB["api_url"])
    r = requests.post(write_url, auth=HTTPBasicAuth(
        confdb.IDB["username"], confdb.IDB["password"]),
        params=payload, data=fdata)

    if r.status_code != 204:
akhuziy's avatar
bug fix    
akhuziy committed
329
        raise ValueError("Couldn't write to InfluxDB", r.status_code, r.content)
330
331

    return True