Commit 0a27d826 authored by Azat Khuziyakhmetov's avatar Azat Khuziyakhmetov
Browse files

added 2 more series for pdf report

parent fa051d67
......@@ -14,6 +14,8 @@ measurements = {
"proc": 'pfit-uprocstat',
"jobs": 'pfit-jobinfo',
"node": 'pfit-nodeinfo',
"sys": 'system',
"cpu": 'cpu',
}
# DB
......
......@@ -9,7 +9,8 @@ from . import metrics
m_proc = conf.measurements["proc"]
def get_query_per_node(aggr, metric, job_id, metric_db, measurement, delta, t_start, t_end):
def get_query_aggr_per_node_for_job(aggr, metric, job_id, metric_db, measurement, delta, t_start, t_end):
aggr = '{:s}("{:s}") as "{:s}"'.format(aggr, metric_db, metric)
query = (
......@@ -21,37 +22,39 @@ def get_query_per_node(aggr, metric, job_id, metric_db, measurement, delta, t_st
return query
def get_query_aggr_for_nodes(aggr, metric, nodes, metric_db, measurement, delta, t_start, t_end):
aggr = '{:s}("{:s}") as "{:s}"'.format(aggr, metric_db, metric)
def get_query_sum_per_interval(metric, job_id, aggr, metric_db):
interval = conf.METRIC_INTERVAL
aggr_outer = '{:s}("{:s}_aggr") as "{:s}"'.format(aggr, metric, metric)
aggr_inner = 'sum("{:s}") as "{:s}_aggr"'.format(metric_db, metric)
node_cond = ' OR '.join("host='{:s}'".format(n) for n in nodes)
query = (
'SELECT {:s} FROM ('
'SELECT {:s} '
'FROM "{:s}" WHERE "jobid1" = \'{:s}\' '
'GROUP BY "host", time({:d}s)) GROUP BY "host";'
).format(aggr_outer, aggr_inner, m_proc, job_id, interval)
'SELECT {:s} FROM "{:s}" '
'WHERE ({:s}) '
'AND time >= {:d}s AND time <= {:d}s '
'GROUP BY "host", time({:d}s);'
).format(aggr, measurement, node_cond, t_start, t_end, delta)
return query
def get_query_sum_per_node(metric, job_id, metric_db, measurement, delta, t_start, t_end):
return get_query_per_node("sum", metric, job_id, metric_db, measurement, delta, t_start, t_end)
def get_query_sum_per_node_of_job(metric, job_id, metric_db, measurement, delta, t_start, t_end):
return get_query_aggr_per_node_for_job("sum", metric, job_id, metric_db, measurement, delta, t_start, t_end)
def get_query_avg_of_sum_per_interval(metric, job_id, metric_db):
return get_query_sum_per_interval(metric, job_id, "mean", metric_db)
def get_query_avg_of_node(metric, nodes, metric_db, measurement, delta, t_start, t_end):
return get_query_aggr_for_nodes("mean", metric, nodes, metric_db, measurement, delta, t_start, t_end)
def get_seq_queries(job_id, t_start, t_end, delta):
def get_seq_queries(job_id, t_start, t_end, delta, nodes):
queries = ""
for m, par in metrics.metrics_seq.items():
if par["query"] is metrics.QType.SUM_PER_NODE:
q = get_query_sum_per_node(m, job_id, par["dbname"], par["measurement"], delta, t_start, t_end)
q = get_query_sum_per_node_of_job(
m, job_id, par["dbname"], par["measurement"], delta, t_start, t_end)
elif par["query"] is metrics.QType.AVG_OF_NODE:
q = get_query_avg_of_node(
m, nodes, par["dbname"], par["measurement"], delta, t_start, t_end)
else:
raise ValueError("Unknown query type: {:s}".format(par["query"]))
......@@ -81,7 +84,8 @@ def parse_seq_response(data=None):
host = serie['tags']['host']
nodes[host][metric] = []
for v in range(0, len(serie['values'])):
nodes[host][metric].append(format_seq_val(metric, serie['values'][v][1]))
nodes[host][metric].append(
format_seq_val(metric, serie['values'][v][1]))
return nodes
......@@ -100,19 +104,24 @@ def format_data(seqinfo_init):
return seqinfo
def get_seq_data(job_id, t_start, t_end):
def get_seq_data(job_id, t_start, t_end, nodes):
delta = math.ceil((t_end - t_start) / conf.SEQ_MAX_POINTS)
delta = max(delta, conf.METRIC_INTERVAL)
query = get_seq_queries(job_id, t_start, t_end, delta)
query = get_seq_queries(job_id, t_start, t_end, delta, nodes)
if conf.DEBUG:
print(query)
seqdata_raw = common.fetch_data(query)
seqdata_parsed = parse_seq_response(seqdata_raw)
if conf.DEBUG:
print(query)
for host in seqdata_parsed:
for metric in seqdata_parsed[host]:
print(host, metric, len(seqdata_parsed[host][metric]), str(seqdata_parsed[host][metric])[:100])
print(host, metric, len(seqdata_parsed[host][metric]), str(
seqdata_parsed[host][metric])[:100])
return seqdata_parsed, delta
......@@ -16,7 +16,7 @@ def fetch_all(job_id, type):
if type == "pdf":
t_start = data["job"]["start_time"]
t_end = t_start + data["job"]["run_time"]
data["seq"], data["seq_delta"] = get_seq_data(job_id, t_start, t_end)
data["seq"], data["seq_delta"] = get_seq_data(job_id, t_start, t_end, node_ids)
return data
......@@ -74,6 +74,8 @@ def get_aggregator(job_id, type="ascii"):
if type == "pdf":
new_node.proc.seq_cpu_usage = SeqVals(data["seq_delta"], data["seq"][node_id]["proc_cpu_usage_sum"])
new_node.proc.seq_mem_rss_sum = SeqVals(data["seq_delta"], data["seq"][node_id]["proc_mem_rss_sum"])
new_node.seq_cpu_usage = SeqVals(data["seq_delta"], data["seq"][node_id]["node_cpu_usage_avg"])
new_node.seq_load = SeqVals(data["seq_delta"], data["seq"][node_id]["node_load_avg"])
aggr.nodes.append(new_node)
......
......@@ -6,6 +6,7 @@ class QType(Enum):
MAX_SUM_PER_INT = 2
AVG_SUM_PER_INT = 3
SUM_PER_NODE = 4
AVG_OF_NODE = 5
class MType(Enum):
......@@ -141,4 +142,16 @@ metrics_seq = {
"query": QType.SUM_PER_NODE,
"measurement": conf.measurements["proc"],
},
"node_cpu_usage_avg": {
"dbname": "usage_user",
"type": MType.FLT,
"query": QType.AVG_OF_NODE,
"measurement": conf.measurements["cpu"],
},
"node_load_avg": {
"dbname": "load1",
"type": MType.FLT,
"query": QType.AVG_OF_NODE,
"measurement": conf.measurements["sys"],
},
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment