Commit cf9493c4 authored by akhuziy's avatar akhuziy
Browse files

added allocated memory support of Slurm

parent 40070e20
...@@ -87,6 +87,7 @@ class NodeData: ...@@ -87,6 +87,7 @@ class NodeData:
seq_ib_xmit_max = None seq_ib_xmit_max = None
proc = None proc = None
alloc_cu = None alloc_cu = None
alloc_mem = None
ib_rcv_max = None ib_rcv_max = None
ib_xmit_max = None ib_xmit_max = None
beegfs_read_sum = None beegfs_read_sum = None
......
...@@ -31,16 +31,31 @@ def parse_job_response(data=None): ...@@ -31,16 +31,31 @@ def parse_job_response(data=None):
return job return job
def parse_alloc_cu(alloc_info): def parse_alloc_res(alloc_info):
nodes = {} alloc_cu = {}
alloc_mem = {}
nodes_raw = alloc_info.split(":") nodes_raw = alloc_info.split(":")
for n in nodes_raw: for n in nodes_raw:
cu, node_id = n.split("*") resources = n.split("*")
nres = len(resources)
if nres == 3:
cu = resources[0]
mem = resources[1]
node_id = resources[2]
fmem = common.format_metric(metrics.metrics_node, "alloc_mem", mem)
alloc_mem[node_id] = fmem
elif nres == 2:
cu = resources[0]
node_id = resources[1]
alloc_mem[node_id] = -1
else:
raise RuntimeError("Allocated resourses couldn't be parsed")
fcu = common.format_metric(metrics.metrics_node, "alloc_cu", cu) fcu = common.format_metric(metrics.metrics_node, "alloc_cu", cu)
nodes[node_id] = fcu alloc_cu[node_id] = fcu
return nodes return alloc_cu, alloc_mem
def format_job_info(jobinfo_init): def format_job_info(jobinfo_init):
jobinfo = {} jobinfo = {}
...@@ -62,6 +77,6 @@ def get_job_data(job_id): ...@@ -62,6 +77,6 @@ def get_job_data(job_id):
jobinfo = format_job_info(jobdata_parsed) jobinfo = format_job_info(jobdata_parsed)
# Special case: Allocated CU # Special case: Allocated CU
alloc_cu = parse_alloc_cu(jobinfo["alloc_cu"]) alloc_cu, alloc_mem = parse_alloc_res(jobinfo["alloc_cu"])
return jobinfo, alloc_cu return jobinfo, alloc_cu, alloc_mem
...@@ -9,7 +9,7 @@ from . import common ...@@ -9,7 +9,7 @@ from . import common
def fetch_all(job_id, type): def fetch_all(job_id, type):
data = {} data = {}
data["job"], data["alloc_cu"] = get_job_data(job_id) data["job"], data["alloc_cu"], data["alloc_mem"] = get_job_data(job_id)
t_start = data["job"]["start_time"] t_start = data["job"]["start_time"]
t_end = t_start + data["job"]["run_time"] t_end = t_start + data["job"]["run_time"]
...@@ -40,6 +40,7 @@ def get_aggregator(job_id, type="text"): ...@@ -40,6 +40,7 @@ def get_aggregator(job_id, type="text"):
jd = data["job"] jd = data["job"]
cud = data["alloc_cu"] cud = data["alloc_cu"]
memd = data["alloc_mem"]
aggr.job.id = job_id aggr.job.id = job_id
aggr.job.user_name = jd.get("user_name") aggr.job.user_name = jd.get("user_name")
...@@ -74,7 +75,14 @@ def get_aggregator(job_id, type="text"): ...@@ -74,7 +75,14 @@ def get_aggregator(job_id, type="text"):
new_node.phys_thr_core = noded.get("phys_thr_core") new_node.phys_thr_core = noded.get("phys_thr_core")
new_node.main_mem = noded.get("main_mem") new_node.main_mem = noded.get("main_mem")
new_node.alloc_cu = cud.get(node_id) new_node.alloc_cu = cud.get(node_id)
amem = memd.get(node_id)
if amem > 0:
new_node.alloc_mem = memd.get(node_id)
else:
ncpus = noded.get("sockets") * noded.get("cores_per_socket") * noded.get("virt_thr_core")
nmem = int(cud.get(node_id) * noded.get("main_mem") / ncpus)
new_node.alloc_mem = nmem
# for now we just account hyperthreading # for now we just account hyperthreading
new_node.phys_thr_core = 1 new_node.phys_thr_core = 1
new_node.virt_thr_core -= 1 new_node.virt_thr_core -= 1
......
...@@ -152,6 +152,11 @@ metrics_node = { ...@@ -152,6 +152,11 @@ metrics_node = {
"measurement": conf.measurements["node"], "measurement": conf.measurements["node"],
"query": QType.CUSTOM, "query": QType.CUSTOM,
"type": MType.INT}, "type": MType.INT},
"alloc_mem": {
"dbname": "alloc_mem",
"measurement": conf.measurements["node"],
"query": QType.CUSTOM,
"type": MType.INT},
"ib_xmit_max": { "ib_xmit_max": {
"dbname": "PortXmitData", "dbname": "PortXmitData",
"measurement": conf.measurements["infiniband"], "measurement": conf.measurements["infiniband"],
......
...@@ -40,6 +40,7 @@ class OutputSchemes: ...@@ -40,6 +40,7 @@ class OutputSchemes:
"virt_threads_per_core": "virt_thr_core", "virt_threads_per_core": "virt_thr_core",
"available_main_mem": "main_mem", "available_main_mem": "main_mem",
"num_cores": "alloc_cu", "num_cores": "alloc_cu",
"alloc_mem": "alloc_mem",
"ib_rcv_max": "ib_rcv_max", "ib_rcv_max": "ib_rcv_max",
"ib_xmit_max": "ib_xmit_max", "ib_xmit_max": "ib_xmit_max",
"beegfs_read_sum": "beegfs_read_sum", "beegfs_read_sum": "beegfs_read_sum",
...@@ -107,6 +108,7 @@ class OutputSchemes: ...@@ -107,6 +108,7 @@ class OutputSchemes:
"virt_threads_per_core": "virt_thr_core", "virt_threads_per_core": "virt_thr_core",
"available_main_mem": "main_mem", "available_main_mem": "main_mem",
"num_cores": "alloc_cu", "num_cores": "alloc_cu",
"alloc_mem": "alloc_mem",
"ib_rcv_max": "ib_rcv_max", "ib_rcv_max": "ib_rcv_max",
"ib_xmit_max": "ib_xmit_max", "ib_xmit_max": "ib_xmit_max",
"beegfs_read_sum": "beegfs_read_sum", "beegfs_read_sum": "beegfs_read_sum",
......
...@@ -109,7 +109,7 @@ def parse_job_info_lsf(raw_data): ...@@ -109,7 +109,7 @@ def parse_job_info_lsf(raw_data):
nodes_raw = raw_data[job_info["alloc_slots"]].split(":") nodes_raw = raw_data[job_info["alloc_slots"]].split(":")
for n in nodes_raw: for n in nodes_raw:
cu, node_id = n.split("*") cu, node_id = n.split("*")
alloc_info[node_id] = int(cu) alloc_info[node_id] = {"cpu": int(cu), "mem": 0}
# format values # format values
for m, par in _info.items(): for m, par in _info.items():
...@@ -147,7 +147,7 @@ def slurm_time_to_sec(slurm_time): ...@@ -147,7 +147,7 @@ def slurm_time_to_sec(slurm_time):
def slurm_format_alloc(nodes_info): def slurm_format_alloc(nodes_info):
formatted = ":".join("{}*{}".format(nodes_info[n], n) for n in nodes_info) formatted = ":".join("{}*{}*{}".format(nodes_info[n]["cpu"], nodes_info[n]["mem"], n) for n in nodes_info)
return formatted return formatted
...@@ -191,18 +191,22 @@ def slurm_fetch_alloc_info(job_id): ...@@ -191,18 +191,22 @@ def slurm_fetch_alloc_info(job_id):
executable='/bin/bash') executable='/bin/bash')
out = result.stdout.decode("utf-8") out = result.stdout.decode("utf-8")
alloc_raw = re.findall("Nodes=([^ ]+) +CPU_IDs=([^ ]+)", out) alloc_raw = re.findall("Nodes=([^ ]+) +CPU_IDs=([^ ]+) +Mem=([ 0-9]+)", out)
allocs = {} allocs = {}
if len(alloc_raw) == 0: if len(alloc_raw) == 0:
raise ValueError("Cannot parse Slurm allocation info", out) raise ValueError("Cannot parse Slurm allocation info", out)
for (nodes_list_raw, cpu_list_raw) in alloc_raw: for (nodes_list_raw, cpu_list_raw, mem_alloc_raw) in alloc_raw:
nodes_list = slurm_parse_nodes_info(nodes_list_raw) nodes_list = slurm_parse_nodes_info(nodes_list_raw)
cpu_count = slurm_parse_cpu_info(cpu_list_raw) cpu_count = slurm_parse_cpu_info(cpu_list_raw)
if re.match(r"[0-9]+", mem_alloc_raw):
mem_alloc = int(mem_alloc_raw)
else:
mem_alloc = ""
for node_name in nodes_list: for node_name in nodes_list:
allocs[node_name] = cpu_count allocs[node_name] = {"cpu": cpu_count, "mem": mem_alloc}
return allocs return allocs
...@@ -220,7 +224,10 @@ def parse_job_info_slurm(raw_data, alloc_info): ...@@ -220,7 +224,10 @@ def parse_job_info_slurm(raw_data, alloc_info):
# dates to timestamp # dates to timestamp
for f in time_fields: for f in time_fields:
job_info[f] = date_to_timestamp(job_info[f], "%Y-%m-%dT%H:%M:%S") if f == "end_time" and job_info[f] == "Unknown":
job_info[f] = 0
else:
job_info[f] = date_to_timestamp(job_info[f], "%Y-%m-%dT%H:%M:%S")
# slurm times to seconds # slurm times to seconds
for f in dur_fields: for f in dur_fields:
...@@ -246,10 +253,13 @@ def format_influxdb_alloc(job_id, alloc_info, cur_time): ...@@ -246,10 +253,13 @@ def format_influxdb_alloc(job_id, alloc_info, cur_time):
mname = conf.job_info["measurement_name"] + "-alloc" mname = conf.job_info["measurement_name"] + "-alloc"
req = "" req = ""
for node_name, alloc_cu in alloc_info.items(): for node_name, alloc_data in alloc_info.items():
alloc_cu = alloc_data["cpu"]
alloc_mem = alloc_data["mem"]
req += "{:s},jobid={:s},host={:s} ".format(mname, job_id, node_name) req += "{:s},jobid={:s},host={:s} ".format(mname, job_id, node_name)
req += "alloc_cu={}".format(alloc_cu) req += "alloc_cu={},".format(alloc_cu)
req += "alloc_mem={}".format(alloc_mem)
req += " {:d}\n".format(cur_time) req += " {:d}\n".format(cur_time)
...@@ -318,7 +328,7 @@ def export_job_info(job_id): ...@@ -318,7 +328,7 @@ def export_job_info(job_id):
fdata = format_influx_db_out(job_id, job_info, alloc_info) fdata = format_influx_db_out(job_id, job_info, alloc_info)
payload = {'db': confdb.IDB["database"], 'precision': 's'} payload = {'db': confdb.IDB["database"], 'precision': 's'}
write_url = "{:s}/write".format(confdb.IDB["api_url"]) write_url = "{:s}/write".format(confdb.IDB["api_url"])
r = requests.post(write_url, auth=HTTPBasicAuth( r = requests.post(write_url, auth=HTTPBasicAuth(
confdb.IDB["username"], confdb.IDB["password"]), confdb.IDB["username"], confdb.IDB["password"]),
......
...@@ -45,6 +45,7 @@ class NodeData: ...@@ -45,6 +45,7 @@ class NodeData:
seq_ib_xmit_max = None seq_ib_xmit_max = None
proc = None proc = None
alloc_cu = None alloc_cu = None
alloc_mem = None
ib_rcv_max = None ib_rcv_max = None
ib_xmit_max = None ib_xmit_max = None
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment