Commit 1078f63d authored by akhuziy's avatar akhuziy
Browse files

added beegfs metrics

parent ea3e1409
......@@ -17,6 +17,9 @@ measurements = {
"sys": 'system',
"cpu": 'cpu',
"infiniband": 'infiniband',
"beegfs": 'beegfs_clients',
"gpu_node": 'nvidia_gpu',
"gpu_proc": 'nvidia_proc',
}
# DB
......@@ -33,6 +36,7 @@ BATCH_SYSTEM = "SLURM" # LSF, SLURM
# Network
INFINIBAND = True # aggregate infiniband data
BEEGFS = True # aggregate beegfs data
# PDF report
SEQ_MAX_POINTS = 500
......@@ -67,6 +67,8 @@ class NodeData:
alloc_cu = None
ib_rcv_max = None
ib_xmit_max = None
beegfs_read_sum = None
beegfs_write_sum = None
def __init__(self):
self.proc = ProcData()
......@@ -75,6 +77,8 @@ class NodeData:
self.seq_cpu_usage = SeqVals()
self.seq_ib_rcv_max = SeqVals()
self.seq_ib_xmit_max = SeqVals()
self.seq_beegfs_read_sum = SeqVals()
self.seq_beegfs_write_sum = SeqVals()
class Aggregator:
......
......@@ -25,6 +25,15 @@ def get_query_max_of_node(node_id, msrmnt, metric, metric_db, t_start, t_end):
return query
def get_query_sum_of_node(node_id, msrmnt, metric, metric_db, t_start, t_end):
query = (
'SELECT sum({:s}) as "{:s}" FROM "{:s}" '
'WHERE time >= {:d}s AND time <= {:d}s AND '
'"host" = \'{:s}\' GROUP BY "host";'
).format(metric_db, metric, msrmnt, t_start, t_end, node_id)
return query
def get_node_queries(nodes, t_start, t_end):
queries = ""
......@@ -35,6 +44,8 @@ def get_node_queries(nodes, t_start, t_end):
for m, par in metrics.metrics_node.items():
if par["measurement"] == conf.measurements["infiniband"] and conf.INFINIBAND is False:
continue
if par["measurement"] == conf.measurements["beegfs"] and conf.BEEGFS is False:
continue
if par["query"] is metrics.QType.MAX_OF_NODE:
q = get_query_max_of_node(n, par["measurement"], m, par["dbname"], t_start, t_end)
......@@ -42,12 +53,26 @@ def get_node_queries(nodes, t_start, t_end):
return queries
def parse_node_response(data=None):
def parse_node_response(data, queries):
nodes = defaultdict(dict)
queries = queries.split(";")
for i in range(0, len(data)):
if "series" not in data[i]:
raise RuntimeError("Some queries were not executed", data[i])
if "statement_id" in data[i]:
sid = data[i]["statement_id"]
if len(queries) > sid:
if conf.DEBUG:
print("The query was not executed", queries[sid])
else:
if conf.DEBUG:
print("Some queries were not executed", data[i])
else:
raise RuntimeError("Cannot parse the output", data[i])
continue
for s in range(0, len(data[i]['series'])):
for m in range(0, len(data[i]['series'][0]['columns'])):
host = data[i]['series'][s]['tags']['host']
......@@ -66,15 +91,6 @@ def format_node_info(nodeinfo_init):
value = common.format_metric(metrics.metrics_node, name, raw_value)
nodeinfo[n][name] = value
nodeinfo[n]["phys_thr_core"] = 1
nodeinfo[n]["virt_thr_core"] -= 1
# Multiply infiniband metrics by 4 if exist. To get bytes
if "ib_rcv_max" in nodeinfo[n] and nodeinfo[n]["ib_rcv_max"] is not None:
nodeinfo[n]["ib_rcv_max"] *= 4
if "ib_xmit_max" in nodeinfo[n] and nodeinfo[n]["ib_xmit_max"] is not None:
nodeinfo[n]["ib_xmit_max"] *= 4
return nodeinfo
def get_node_data(nodes, t_start, t_end):
......@@ -84,7 +100,7 @@ def get_node_data(nodes, t_start, t_end):
nodedata_raw = common.fetch_data(queries)
nodedata_parsed = parse_node_response(nodedata_raw)
nodedata_parsed = parse_node_response(nodedata_raw, queries)
nodeinfo = format_node_info(nodedata_parsed)
......
......@@ -64,12 +64,26 @@ def get_proc_queries(job_id):
return queries
def parse_proc_response(data=None):
def parse_proc_response(data, queries):
nodes = defaultdict(dict)
queries = queries.split(";")
for i in range(0, len(data)):
if "series" not in data[i]:
raise RuntimeError("Some queries were not executed", data[i])
if "statement_id" in data[i]:
sid = data[i]["statement_id"]
if len(queries) > sid:
if conf.DEBUG:
print("The query was not executed", queries[sid])
else:
if conf.DEBUG:
print("Some queries were not executed", data[i])
else:
raise RuntimeError("Cannot parse the output", data[i])
continue
for s in range(0, len(data[i]['series'])):
metric = data[i]['series'][s]['columns'][1]
host = data[i]['series'][s]['tags']['host']
......@@ -94,7 +108,7 @@ def get_proc_data(job_id):
procdata_raw = common.fetch_data(query)
procdata_parsed = parse_proc_response(procdata_raw)
procdata_parsed = parse_proc_response(procdata_raw, query)
procdata = format_data(procdata_parsed)
......
......@@ -26,8 +26,8 @@ def get_query_aggr_per_node_for_job(aggr, metric, job_id, metric_db, measurement
return query
def get_query_aggr_for_nodes(gaggr, aggr, metric, nodes, metric_db, measurement, delta, t_start, t_end):
aggr = '{:s}("{:s}") as "{:s}"'.format(aggr, metric_db, metric)
def get_query_aggr_for_nodes_double_select(gaggr, aggr, metric, nodes, metric_db, measurement, delta, t_start, t_end):
aggrf = '{:s}("{:s}") as "{:s}"'.format(aggr, metric_db, metric)
node_cond = ' OR '.join("host='{:s}'".format(n) for n in nodes)
......@@ -39,22 +39,40 @@ def get_query_aggr_for_nodes(gaggr, aggr, metric, nodes, metric_db, measurement,
'GROUP BY "host", time({dbdelta}s)) '
'WHERE time >= {start}s AND time <= {end}s '
'GROUP BY "host", time({delta}s);'
).format(gagr = gaggr, agr = aggr, mes = measurement, nodes = node_cond,
).format(gagr = gaggr, agr = aggrf, mes = measurement, nodes = node_cond,
start = t_start, end = t_end, delta = delta, met = metric,
dbdelta = DB_INTERVAL)
return query
def get_query_aggr_for_nodes(aggr, metric, nodes, metric_db, measurement, delta, t_start, t_end):
aggrf = '{:s}("{:s}") as "{:s}"'.format(aggr, metric_db, metric)
node_cond = ' OR '.join("host='{:s}'".format(n) for n in nodes)
query = (
'SELECT {agr} FROM "{mes}" '
'WHERE ({nodes}) '
'AND time >= {start}s AND time <= {end}s '
'GROUP BY "host", time({delta}s);'
).format(agr = aggrf, mes = measurement, nodes = node_cond,
start = t_start, end = t_end, delta = delta, met = metric)
return query
def get_query_sum_per_node_of_job(metric, job_id, metric_db, measurement, delta, t_start, t_end):
return get_query_aggr_per_node_for_job("sum", metric, job_id, metric_db, measurement, delta, t_start, t_end)
def get_query_avg_of_node(metric, nodes, metric_db, measurement, delta, t_start, t_end):
return get_query_aggr_for_nodes("mean", "mean", metric, nodes, metric_db, measurement, delta, t_start, t_end)
return get_query_aggr_for_nodes("mean", metric, nodes, metric_db, measurement, delta, t_start, t_end)
def get_query_sum_of_node(metric, nodes, metric_db, measurement, delta, t_start, t_end):
return get_query_aggr_for_nodes("sum", metric, nodes, metric_db, measurement, delta, t_start, t_end)
def get_query_max_of_node(metric, nodes, metric_db, measurement, delta, t_start, t_end):
return get_query_aggr_for_nodes("max", "max", metric, nodes, metric_db, measurement, delta, t_start, t_end)
return get_query_aggr_for_nodes("max", metric, nodes, metric_db, measurement, delta, t_start, t_end)
def get_seq_queries(job_id, t_start, t_end, delta, nodes):
......@@ -74,6 +92,9 @@ def get_seq_queries(job_id, t_start, t_end, delta, nodes):
elif par["query"] is metrics.QType.MAX_OF_NODE:
q = get_query_max_of_node(
m, nodes, par["dbname"], par["measurement"], delta, t_start, t_end)
elif par["query"] is metrics.QType.SUM_OF_NODE:
q = get_query_sum_of_node(
m, nodes, par["dbname"], par["measurement"], delta, t_start, t_end)
else:
raise ValueError("Unknown query type: {:s}".format(par["query"]))
......@@ -91,12 +112,26 @@ def format_seq_val(metric, val):
return fmtd
def parse_seq_response(data=None):
def parse_seq_response(data, queries):
nodes = defaultdict(dict)
queries = queries.split(";")
for i in range(0, len(data)):
if "series" not in data[i]:
raise RuntimeError("Some queries were not executed", data[i])
if "statement_id" in data[i]:
sid = data[i]["statement_id"]
if len(queries) > sid:
if conf.DEBUG:
print("The query was not executed", queries[sid])
else:
if conf.DEBUG:
print("Some queries were not executed", data[i])
else:
raise RuntimeError("Cannot parse the output", data[i])
continue
for s in range(0, len(data[i]['series'])):
serie = data[i]['series'][s]
metric = serie['columns'][1]
......@@ -133,7 +168,7 @@ def get_seq_data(job_id, t_start, t_end, nodes):
seqdata_raw = common.fetch_data(query)
seqdata_parsed = parse_seq_response(seqdata_raw)
seqdata_parsed = parse_seq_response(seqdata_raw, query)
if conf.DEBUG:
for host in seqdata_parsed:
......
......@@ -31,15 +31,15 @@ def get_aggregator(job_id, type="text"):
cud = data["alloc_cu"]
aggr.job.id = job_id
aggr.job.user_name = jd["user_name"]
aggr.job.used_queue = jd["used_queue"]
aggr.job.submit_time = jd["submit_time"]
aggr.job.start_time = jd["start_time"]
aggr.job.end_time = jd["start_time"] + jd["run_time"]
aggr.job.run_time = jd["run_time"]
aggr.job.requested_time = jd["requested_time"]
aggr.job.requested_cu = jd["requested_cu"]
aggr.job.num_nodes = jd["num_nodes"]
aggr.job.user_name = jd.get("user_name")
aggr.job.used_queue = jd.get("used_queue")
aggr.job.submit_time = jd.get("submit_time")
aggr.job.start_time = jd.get("start_time")
aggr.job.end_time = jd.get("start_time") + jd.get("run_time")
aggr.job.run_time = jd.get("run_time")
aggr.job.requested_time = jd.get("requested_time")
aggr.job.requested_cu = jd.get("requested_cu")
aggr.job.num_nodes = jd.get("num_nodes")
aggr.job.alloc_cu = []
for node_id, cu_count in cud.items():
......@@ -50,50 +50,82 @@ def get_aggregator(job_id, type="text"):
new_node = NodeData()
new_node.name = node_id
new_node.cpu_model = noded["cpu_model"]
new_node.sockets = noded["sockets"]
new_node.cores_per_socket = noded["cores_per_socket"]
new_node.virt_thr_core = noded["virt_thr_core"]
new_node.phys_thr_core = noded["phys_thr_core"]
new_node.main_mem = noded["main_mem"]
new_node.alloc_cu = cud[node_id]
new_node.cpu_model = noded.get("cpu_model")
new_node.sockets = noded.get("sockets")
new_node.cores_per_socket = noded.get("cores_per_socket")
new_node.virt_thr_core = noded.get("virt_thr_core")
new_node.phys_thr_core = noded.get("phys_thr_core")
new_node.main_mem = noded.get("main_mem")
new_node.alloc_cu = cud.get(node_id)
# for now we just account hyperthreading
new_node.phys_thr_core = 1
new_node.virt_thr_core -= 1
if conf.INFINIBAND is True:
new_node.ib_rcv_max = noded["ib_rcv_max"]
new_node.ib_xmit_max = noded["ib_xmit_max"]
procd = data["proc"][node_id]
new_node.proc.cpu_time_user = procd["cpu_time_user"]
new_node.proc.cpu_time_system = procd["cpu_time_system"]
new_node.proc.cpu_time_idle = procd["cpu_time_idle"]
new_node.proc.cpu_time_iowait = procd["cpu_time_iowait"]
new_node.proc.write_bytes = procd["write_bytes"]
new_node.proc.write_count = procd["write_count"]
new_node.proc.read_bytes = procd["read_bytes"]
new_node.proc.read_count = procd["read_count"]
new_node.proc.mem_rss_max = procd["mem_rss_max"]
new_node.proc.mem_rss_avg = procd["mem_rss_avg"]
new_node.proc.mem_swap_max = procd["mem_swap_max"]
new_node.proc.mem_vms = procd["mem_vms"]
new_node.proc.cpu_usage = procd["cpu_usage"]
new_node.ib_rcv_max = noded.get("ib_rcv_max")
new_node.ib_xmit_max = noded.get("ib_xmit_max")
# Multiply infiniband metrics by 4 if exist. To get bytes
if new_node.ib_rcv_max is not None:
new_node.ib_rcv_max *= 4
if new_node.ib_xmit_max is not None:
new_node.ib_xmit_max *= 4
if conf.BEEGFS is True:
new_node.beegfs_read_sum = noded.get("beegfs_read_sum")
new_node.beegfs_write_sum = noded.get("beegfs_write_sum")
# Multiply beegfs metrics by 1024 if exist. To get bytes
if new_node.beegfs_read_sum is not None:
new_node.beegfs_read_sum *= 1024
if new_node.beegfs_write_sum is not None:
new_node.beegfs_write_sum *= 1024
procd = data["proc"].get(node_id)
new_node.proc.cpu_time_user = procd.get("cpu_time_user")
new_node.proc.cpu_time_system = procd.get("cpu_time_system")
new_node.proc.cpu_time_idle = procd.get("cpu_time_idle")
new_node.proc.cpu_time_iowait = procd.get("cpu_time_iowait")
new_node.proc.write_bytes = procd.get("write_bytes")
new_node.proc.write_count = procd.get("write_count")
new_node.proc.read_bytes = procd.get("read_bytes")
new_node.proc.read_count = procd.get("read_count")
new_node.proc.mem_rss_max = procd.get("mem_rss_max")
new_node.proc.mem_rss_avg = procd.get("mem_rss_avg")
new_node.proc.mem_swap_max = procd.get("mem_swap_max")
new_node.proc.mem_vms = procd.get("mem_vms")
new_node.proc.cpu_usage = procd.get("cpu_usage")
if type == "pdf":
new_node.proc.seq_cpu_usage = SeqVals(data["seq_delta"], data["seq"][node_id]["proc_cpu_usage_sum"])
new_node.proc.seq_mem_rss_sum = SeqVals(data["seq_delta"], data["seq"][node_id]["proc_mem_rss_sum"])
new_node.proc.seq_write_sum = SeqVals(data["seq_delta"], data["seq"][node_id]["proc_write_sum"])
new_node.proc.seq_read_sum = SeqVals(data["seq_delta"], data["seq"][node_id]["proc_read_sum"])
new_node.seq_cpu_usage = SeqVals(data["seq_delta"], data["seq"][node_id]["node_cpu_idle_avg"])
new_node.proc.seq_cpu_usage = SeqVals(data["seq_delta"], data["seq"][node_id].get("proc_cpu_usage_sum"))
new_node.proc.seq_mem_rss_sum = SeqVals(data["seq_delta"], data["seq"][node_id].get("proc_mem_rss_sum"))
new_node.proc.seq_write_sum = SeqVals(data["seq_delta"], data["seq"][node_id].get("proc_write_sum"))
new_node.proc.seq_read_sum = SeqVals(data["seq_delta"], data["seq"][node_id].get("proc_read_sum"))
new_node.seq_cpu_usage = SeqVals(data["seq_delta"], data["seq"][node_id].get("node_cpu_idle_avg"))
new_node.seq_cpu_usage.seq = [None if x is None else (100 - x) for x in new_node.seq_cpu_usage.seq]
new_node.seq_load_avg = SeqVals(data["seq_delta"], data["seq"][node_id]["node_load_avg"])
new_node.seq_load_max = SeqVals(data["seq_delta"], data["seq"][node_id]["node_load_max"])
new_node.seq_load_avg = SeqVals(data["seq_delta"], data["seq"][node_id].get("node_load_avg"))
new_node.seq_load_max = SeqVals(data["seq_delta"], data["seq"][node_id].get("node_load_max"))
if conf.INFINIBAND is True:
new_node.seq_ib_rcv_max = SeqVals(data["seq_delta"], data["seq"][node_id]["node_ib_rcv_max"])
new_node.seq_ib_xmit_max = SeqVals(data["seq_delta"], data["seq"][node_id]["node_ib_xmit_max"])
new_node.seq_ib_rcv_max = SeqVals(data["seq_delta"], data["seq"][node_id].get("node_ib_rcv_max"))
new_node.seq_ib_xmit_max = SeqVals(data["seq_delta"], data["seq"][node_id].get("node_ib_xmit_max"))
# Multiply values by 4 in order to get bytes
if new_node.seq_ib_rcv_max.seq is not None:
new_node.seq_ib_rcv_max.seq = [None if x is None else (4 * x) for x in new_node.seq_ib_rcv_max.seq]
if new_node.seq_ib_xmit_max.seq is not None:
new_node.seq_ib_xmit_max.seq = [None if x is None else (4 * x) for x in new_node.seq_ib_xmit_max.seq]
if conf.BEEGFS is True:
new_node.seq_beegfs_read_sum = SeqVals(data["seq_delta"], data["seq"][node_id].get("node_beegfs_read_sum"))
new_node.seq_beegfs_write_sum = SeqVals(data["seq_delta"], data["seq"][node_id].get("node_beegfs_write_sum"))
# Multiply values by 4 in order to get bytes
new_node.seq_ib_rcv_max.seq = [None if x is None else (4 * x) for x in new_node.seq_ib_rcv_max.seq]
new_node.seq_ib_xmit_max.seq = [None if x is None else (4 * x) for x in new_node.seq_ib_xmit_max.seq]
if new_node.seq_beegfs_read_sum.seq is not None:
new_node.seq_beegfs_read_sum.seq = [None if x is None else (4 * x) for x in new_node.seq_beegfs_read_sum.seq]
if new_node.seq_beegfs_write_sum.seq is not None:
new_node.seq_beegfs_write_sum.seq = [None if x is None else (4 * x) for x in new_node.seq_beegfs_write_sum.seq]
aggr.nodes.append(new_node)
......
......@@ -9,6 +9,7 @@ class QType(Enum):
SUM_PER_NODE = 4
AVG_OF_NODE = 5
MAX_OF_NODE = 6
SUM_OF_NODE = 7
class MType(Enum):
......@@ -153,6 +154,18 @@ metrics_node = {
"measurement": conf.measurements["infiniband"],
"query": QType.MAX_OF_NODE,
"type": MType.INT},
"beegfs_write_sum": {
"dbname": "MiB-wr",
"type": MType.INT,
"query": QType.SUM_OF_NODE,
"measurement": conf.measurements["beegfs"],
},
"beegfs_read_sum": {
"dbname": "MiB-rd",
"type": MType.INT,
"query": QType.SUM_OF_NODE,
"measurement": conf.measurements["beegfs"],
},
}
metrics_seq = {
......@@ -216,4 +229,16 @@ metrics_seq = {
"query": QType.MAX_OF_NODE,
"measurement": conf.measurements["infiniband"],
},
"node_beegfs_write_sum": {
"dbname": "MiB-wr",
"type": MType.INT,
"query": QType.SUM_OF_NODE,
"measurement": conf.measurements["beegfs"],
},
"node_beegfs_read_sum": {
"dbname": "MiB-rd",
"type": MType.INT,
"query": QType.SUM_OF_NODE,
"measurement": conf.measurements["beegfs"],
},
}
......@@ -39,9 +39,11 @@ class OutputSchemes:
"phys_threads_per_core": "phys_thr_core",
"virt_threads_per_core": "virt_thr_core",
"available_main_mem": "main_mem",
"num_cores": "alloc_cu",
"ib_rcv_max": "ib_rcv_max",
"ib_xmit_max": "ib_xmit_max",
"num_cores": "alloc_cu",
"beegfs_read_sum": "beegfs_read_sum",
"beegfs_write_sum": "beegfs_write_sum",
}
}
}
......@@ -87,9 +89,11 @@ class OutputSchemes:
"phys_threads_per_core": "phys_thr_core",
"virt_threads_per_core": "virt_thr_core",
"available_main_mem": "main_mem",
"num_cores": "alloc_cu",
"ib_rcv_max": "ib_rcv_max",
"ib_xmit_max": "ib_xmit_max",
"num_cores": "alloc_cu",
"beegfs_read_sum": "beegfs_read_sum",
"beegfs_write_sum": "beegfs_write_sum",
"dynamic": {
"node_cpu_usage": {
"type": "seq",
......@@ -111,6 +115,16 @@ class OutputSchemes:
"path": "seq_ib_xmit_max",
"scheme": "std"
},
"node_beegfs_read_sum": {
"type": "seq",
"path": "seq_beegfs_read_sum",
"scheme": "std"
},
"node_beegfs_write_sum": {
"type": "seq",
"path": "seq_beegfs_write_sum",
"scheme": "std"
},
"proc_cpu_usage": {
"type": "seq",
"path": "proc.seq_cpu_usage",
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment