Commit 6a3d07a4 authored by Azat Khuziyakhmetov's avatar Azat Khuziyakhmetov
Browse files

added infiniband max and sequence data

parent da9aa649
......@@ -16,6 +16,7 @@ measurements = {
"node": 'pfit-nodeinfo',
"sys": 'system',
"cpu": 'cpu',
"infiniband": 'infiniband',
}
# DB
......@@ -30,5 +31,8 @@ job_info = {
# Batch system
BATCH_SYSTEM = "SLURM" # LSF, SLURM
# Network
INFINIBAND = True # collect infiniband data
# PDF report
SEQ_MAX_POINTS = 500
......@@ -61,14 +61,20 @@ class NodeData:
seq_cpu_usage = None
seq_load_avg = None
seq_load_max = None
seq_ib_rcv_max = None
seq_ib_xmit_max = None
proc = None
alloc_cu = None
ib_rcv_max = None
ib_xmit_max = None
def __init__(self):
self.proc = ProcData()
self.seq_load_avg = SeqVals()
self.seq_load_max = SeqVals()
self.seq_cpu_usage = SeqVals()
self.seq_ib_rcv_max = SeqVals()
self.seq_ib_xmit_max = SeqVals()
class Aggregator:
......
......@@ -8,7 +8,7 @@ from . import metrics
m_node = conf.measurements["node"]
def get_node_query(node_id):
def get_nodeinfo_query(node_id):
random_field = metrics.metrics_node["sockets"]["dbname"]
query = (
'SELECT last({:s}), * FROM "{:s}" '
......@@ -16,14 +16,30 @@ def get_node_query(node_id):
).format(random_field, m_node, node_id)
return query
def get_node_queries(nodes):
def get_query_max_of_node(node_id, msrmnt, metric, metric_db, t_start, t_end):
query = (
'SELECT max({:s}) as "{:s}" FROM "{:s}" '
'WHERE time >= {:d}s AND time <= {:d}s AND '
'"host" = \'{:s}\' GROUP BY "host";'
).format(metric_db, metric, msrmnt, t_start, t_end, node_id)
return query
def get_node_queries(nodes, t_start, t_end):
queries = ""
for n in nodes:
q = get_node_query(n)
q = get_nodeinfo_query(n)
queries += q
for m, par in metrics.metrics_node.items():
if par["measurement"] == conf.measurements["infiniband"] and conf.INFINIBAND is False:
continue
if par["query"] is metrics.QType.MAX_OF_NODE:
q = get_query_max_of_node(n, par["measurement"], m, par["dbname"], t_start, t_end)
queries += q
return queries
def parse_node_response(data=None):
......@@ -46,19 +62,25 @@ def format_node_info(nodeinfo_init):
for n in nodeinfo_init:
for name, par in metrics.metrics_node.items():
raw_value = nodeinfo_init[n].get(par["dbname"], None)
raw_value = nodeinfo_init[n].get(par["dbname"], nodeinfo_init[n].get(name, None))
value = common.format_metric(metrics.metrics_node, name, raw_value)
nodeinfo[n][name] = value
nodeinfo[n]["phys_thr_core"] = 1
nodeinfo[n]["virt_thr_core"] -= 1
# Multiply infiniband metrics by 4 if exist. To get bytes
if "ib_rcv_max" in nodeinfo[n] and nodeinfo[n]["ib_rcv_max"] is not None:
nodeinfo[n]["ib_rcv_max"] *= 4
if "ib_xmit_max" in nodeinfo[n] and nodeinfo[n]["ib_xmit_max"] is not None:
nodeinfo[n]["ib_xmit_max"] *= 4
return nodeinfo
def get_node_data(nodes):
def get_node_data(nodes, t_start, t_end):
nodes = set(nodes)
queries = get_node_queries(nodes)
queries = get_node_queries(nodes, t_start, t_end)
nodedata_raw = common.fetch_data(queries)
......
......@@ -61,6 +61,10 @@ def get_seq_queries(job_id, t_start, t_end, delta, nodes):
queries = ""
for m, par in metrics.metrics_seq.items():
# skip generating infiniband queries if not activated
if par["measurement"] == conf.measurements["infiniband"] and conf.INFINIBAND is False:
continue
if par["query"] is metrics.QType.SUM_PER_NODE:
q = get_query_sum_per_node_of_job(
m, job_id, par["dbname"], par["measurement"], delta, t_start, t_end)
......@@ -124,7 +128,6 @@ def get_seq_data(job_id, t_start, t_end, nodes):
delta = max(delta, conf.METRIC_INTERVAL)
query = get_seq_queries(job_id, t_start, t_end, delta, nodes)
if conf.DEBUG:
print(query)
......
import conf.config as conf
from .influxdb_fetchjob import get_job_data
from .influxdb_fetchproc import get_proc_data
from .influxdb_fetchnode import get_node_data
......@@ -7,15 +8,16 @@ from ..aggrstruct import *
def fetch_all(job_id, type):
data = {}
data["job"], data["alloc_cu"] = get_job_data(job_id)
t_start = data["job"]["start_time"]
t_end = t_start + data["job"]["run_time"]
data["proc"] = get_proc_data(job_id)
node_ids = list(data["proc"].keys())
data["node"] = get_node_data(node_ids)
data["node"] = get_node_data(node_ids, t_start, t_end)
if type == "pdf":
t_start = data["job"]["start_time"]
t_end = t_start + data["job"]["run_time"]
data["seq"], data["seq_delta"] = get_seq_data(job_id, t_start, t_end, node_ids)
return data
......@@ -55,6 +57,9 @@ def get_aggregator(job_id, type="text"):
new_node.phys_thr_core = noded["phys_thr_core"]
new_node.main_mem = noded["main_mem"]
new_node.alloc_cu = cud[node_id]
if conf.INFINIBAND is True:
new_node.ib_rcv_max = noded["ib_rcv_max"]
new_node.ib_xmit_max = noded["ib_xmit_max"]
procd = data["proc"][node_id]
......@@ -83,6 +88,13 @@ def get_aggregator(job_id, type="text"):
new_node.seq_load_avg = SeqVals(data["seq_delta"], data["seq"][node_id]["node_load_avg"])
new_node.seq_load_max = SeqVals(data["seq_delta"], data["seq"][node_id]["node_load_max"])
if conf.INFINIBAND is True:
new_node.seq_ib_rcv_max = SeqVals(data["seq_delta"], data["seq"][node_id]["node_ib_rcv_max"])
new_node.seq_ib_xmit_max = SeqVals(data["seq_delta"], data["seq"][node_id]["node_ib_xmit_max"])
# Multiply values by 4 in order to get bytes
new_node.seq_ib_rcv_max.seq = [None if x is None else (4 * x) for x in new_node.seq_ib_rcv_max.seq]
new_node.seq_ib_xmit_max.seq = [None if x is None else (4 * x) for x in new_node.seq_ib_xmit_max.seq]
aggr.nodes.append(new_node)
return aggr
......@@ -2,6 +2,7 @@ from enum import Enum
from conf import config as conf
class QType(Enum):
CUSTOM = 0
SUM_OF_MAX = 1
MAX_SUM_PER_INT = 2
AVG_SUM_PER_INT = 3
......@@ -109,24 +110,48 @@ metrics_job = {
metrics_node = {
"cpu_model": {
"dbname": "cpu_model",
"measurement": conf.measurements["node"],
"query": QType.CUSTOM,
"type": MType.STR},
"sockets": {
"dbname": "sockets",
"measurement": conf.measurements["node"],
"query": QType.CUSTOM,
"type": MType.INT},
"cores_per_socket": {
"dbname": "cores_per_socket",
"measurement": conf.measurements["node"],
"query": QType.CUSTOM,
"type": MType.INT},
"phys_thr_core": {
"dbname": "threads_per_core",
"measurement": conf.measurements["node"],
"query": QType.CUSTOM,
"type": MType.INT},
"virt_thr_core": {
"dbname": "threads_per_core",
"measurement": conf.measurements["node"],
"query": QType.CUSTOM,
"type": MType.INT},
"main_mem": {
"dbname": "main_mem",
"measurement": conf.measurements["node"],
"query": QType.CUSTOM,
"type": MType.INT},
"alloc_cu": {
"dbname": "alloc_cu",
"measurement": conf.measurements["node"],
"query": QType.CUSTOM,
"type": MType.INT},
"ib_xmit_max": {
"dbname": "PortXmitData",
"measurement": conf.measurements["infiniband"],
"query": QType.MAX_OF_NODE,
"type": MType.INT},
"ib_rcv_max": {
"dbname": "PortRcvData",
"measurement": conf.measurements["infiniband"],
"query": QType.MAX_OF_NODE,
"type": MType.INT},
}
......@@ -179,4 +204,16 @@ metrics_seq = {
"query": QType.MAX_OF_NODE,
"measurement": conf.measurements["sys"],
},
"node_ib_xmit_max": {
"dbname": "PortXmitData",
"type": MType.INT,
"query": QType.MAX_OF_NODE,
"measurement": conf.measurements["infiniband"],
},
"node_ib_rcv_max": {
"dbname": "PortRcvData",
"type": MType.INT,
"query": QType.MAX_OF_NODE,
"measurement": conf.measurements["infiniband"],
},
}
......@@ -39,6 +39,8 @@ class OutputSchemes:
"phys_threads_per_core": "phys_thr_core",
"virt_threads_per_core": "virt_thr_core",
"available_main_mem": "main_mem",
"ib_rcv_max": "ib_rcv_max",
"ib_xmit_max": "ib_xmit_max",
"num_cores": "alloc_cu",
}
}
......@@ -85,6 +87,8 @@ class OutputSchemes:
"phys_threads_per_core": "phys_thr_core",
"virt_threads_per_core": "virt_thr_core",
"available_main_mem": "main_mem",
"ib_rcv_max": "ib_rcv_max",
"ib_xmit_max": "ib_xmit_max",
"num_cores": "alloc_cu",
"dynamic": {
"node_cpu_usage": {
......@@ -97,6 +101,16 @@ class OutputSchemes:
"path": "seq_load_avg",
"scheme": "std"
},
"node_ib_rcv_max": {
"type": "seq",
"path": "seq_ib_rcv_max",
"scheme": "std"
},
"node_ib_xmit_max": {
"type": "seq",
"path": "seq_ib_xmit_max",
"scheme": "std"
},
"proc_cpu_usage": {
"type": "seq",
"path": "proc.seq_cpu_usage",
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment