Commit f7815360 authored by Azat Khuziyakhmetov's avatar Azat Khuziyakhmetov
Browse files

extended alloc info + bug fix

parent 043ebf01
......@@ -102,6 +102,15 @@ def parse_job_info_lsf(raw_data):
for f, r in reg_fields.items():
job_info[f] = get_by_regex(job_info[f], r)
# reformat alloc info
alloc_info = {}
nodes_raw = raw_data[job_info["alloc_slots"]].split(":")
for n in nodes_raw:
cu, node_id = n.split("*")
alloc_info[node_id] = int(cu)
# format values
for m, par in _info.items():
job_info[m] = common.format_metric(_info, m, job_info[m])
......@@ -112,7 +121,7 @@ def parse_job_info_lsf(raw_data):
if job_info["requested_time"] is not None:
job_info["requested_time"] *= 60
return job_info
return job_info, alloc_info
def slurm_time_to_sec(slurm_time):
......@@ -137,25 +146,67 @@ def slurm_time_to_sec(slurm_time):
return secs
def slurm_format_alloc(slurm_alloc, nn, cpus):
fetch_comm = "scontrol show hostname {:s}".format(slurm_alloc)
def slurm_format_alloc(nodes_info):
formatted = ":".join("{}*{}".format(nodes_info[n], n) for n in nodes_info)
return formatted
def slurm_parse_nodes_info(nodes):
slurm_list_set = [",","[","-"]
is_list = False
for c in slurm_list_set:
if c in nodes:
is_list = True
break
if is_list:
tolist_comm = "scontrol show hostname {:s}".format(nodes)
result = subprocess.run(tolist_comm, stdout=subprocess.PIPE, shell=True,
executable='/bin/bash')
out = result.stdout.decode("utf-8")
out = out.splitlines()
else:
out = [nodes]
ret = out
return ret
def slurm_parse_cpu_info(cpus):
cpus_list = cpus.split(",")
cpu_count = 0
for cpu_info in cpus_list:
if "-" in cpu_info:
a_and_b = cpu_info.split("-")
cpu_count += int(a_and_b[1]) - int(a_and_b[0]) + 1
else:
cpu_count += int(cpu_info)
return cpu_count
def slurm_fetch_alloc_info(job_id):
fetch_comm = "scontrol show job -d {:s}".format(job_id)
result = subprocess.run(fetch_comm, stdout=subprocess.PIPE, shell=True,
executable='/bin/bash')
out = result.stdout.decode("utf-8")
nodes = out.split()
alloc_raw = re.findall("Nodes=([^ ]+) +CPU_IDs=([^ ]+)", out)
allocs = {}
if len(nodes) != int(nn):
raise ValueError("Number of nodes seems to be wrong", nodes, nn)
if len(alloc_raw) == 0:
raise ValueError("Cannot parse Slurm allocation info", out)
cpupernode = int(int(cpus) / int(nn))
for (nodes_list_raw, cpu_list_raw) in alloc_raw:
nodes_list = slurm_parse_nodes_info(nodes_list_raw)
cpu_count = slurm_parse_cpu_info(cpu_list_raw)
formatted = ":".join("{}*{}".format(cpupernode, n) for n in nodes)
return formatted
for node_name in nodes_list:
allocs[node_name] = cpu_count
return allocs
def parse_job_info_slurm(raw_data):
def parse_job_info_slurm(raw_data, alloc_info):
time_fields = ["submit_time", "start_time", "end_time"]
dur_fields = ["run_time", "requested_time"]
......@@ -176,8 +227,7 @@ def parse_job_info_slurm(raw_data):
job_info[f] = slurm_time_to_sec(job_info[f])
# reformat cu allocation
job_info["alloc_slots"] = slurm_format_alloc(
job_info["alloc_slots"], job_info["num_nodes"], job_info["requested_cu"])
job_info["alloc_slots"] = slurm_format_alloc(alloc_info)
# format values
for m, par in _info.items():
......@@ -192,17 +242,35 @@ def format_fields(m, val, metrics):
return "{}={}".format(_info[m]["rename"], val)
def format_influx_db_out(data, job_id):
def format_influxdb_alloc(job_id, alloc_info, cur_time):
mname = conf.job_info["measurement_name"] + "-alloc"
req = ""
for node_name, alloc_cu in alloc_info.items():
req += "{:s},jobid={:s} ".format(mname, job_id)
req += "node_name={},".format(node_name)
req += "alloc_cu={}".format(alloc_cu)
req += " {:d};".format(cur_time)
return req
def format_influx_db_out(job_id, job_data, alloc_data):
cur_time = int(time.time())
mname = conf.job_info["measurement_name"]
req = "{:s},jobid={:s} ".format(mname, job_id)
req += ",".join(format_fields(k, v, _info) for k, v in data.items())
req += ",".join(format_fields(k, v, _info) for k, v in job_data.items())
req += " {:d}".format(int(time.time()))
req += " {:d};".format(cur_time)
return req
alloc_influx = format_influxdb_alloc(job_id, alloc_data, cur_time)
req += alloc_influx
return req
def format_format_str(batch, qdelim):
oformat = qdelim.join(val[batch] for key, val in _info.items())
......@@ -236,20 +304,20 @@ def fetch_job_data(job_id):
batchsys = conf.BATCH_SYSTEM
if batchsys == "LSF":
raw_info = fetch_job_data_general(job_id, "lsf", " ", ",")
job_info = parse_job_info_lsf(raw_info)
job_info, node_info = parse_job_info_lsf(raw_info)
elif batchsys == "SLURM":
raw_info = fetch_job_data_general(job_id, "slurm", ",", "|")
job_info = parse_job_info_slurm(raw_info)
node_info = slurm_fetch_alloc_info(job_id)
job_info = parse_job_info_slurm(raw_info, node_info)
else:
raise ValueError("Configured batch system is not supported")
return job_info
return job_info, node_info
def export_job_info(job_id):
job_info = fetch_job_data(job_id)
job_info, alloc_info = fetch_job_data(job_id)
fdata = format_influx_db_out(job_info, job_id)
fdata = format_influx_db_out(job_id, job_info, alloc_info)
payload = {'db': confdb.IDB["database"], 'precision': 's'}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment