Commit 2c6147d9 authored by Azat Khuziyakhmetov's avatar Azat Khuziyakhmetov
Browse files

now supports output of sequences

parent e7854d57
...@@ -27,3 +27,6 @@ job_info = { ...@@ -27,3 +27,6 @@ job_info = {
# Batch system # Batch system
BATCH_SYSTEM = "SLURM" # LSF, SLURM BATCH_SYSTEM = "SLURM" # LSF, SLURM
# PDF report
SEQ_MAX_POINTS = 500
class SeqVals:
delta = None
seq = []
def __init__(self, delta=None, seq=None):
self.delta = delta
self.seq = seq
class ProcData: class ProcData:
cpu_time_user = None cpu_time_user = None
cpu_time_system = None cpu_time_system = None
...@@ -12,6 +19,8 @@ class ProcData: ...@@ -12,6 +19,8 @@ class ProcData:
mem_swap_max = None mem_swap_max = None
mem_vms = None mem_vms = None
cpu_usage = None cpu_usage = None
seq_cpu_usage = SeqVals()
seq_mem_rss_sum = SeqVals()
class AllocCUData: class AllocCUData:
node_id = None node_id = None
...@@ -41,19 +50,10 @@ class NodeData: ...@@ -41,19 +50,10 @@ class NodeData:
virt_thr_core = None virt_thr_core = None
phys_thr_core = None phys_thr_core = None
main_mem = None main_mem = None
seq_cpu_usage = SeqVals()
seq_load = SeqVals()
proc = ProcData() proc = ProcData()
class ProcSeq:
cpu_time_user_avg = []
cpu_time_sys_avg = []
mem_rss_avg = []
class NodeSeq:
cpu_usage = []
mem_usage = []
class Aggregator: class Aggregator:
job = JobData() job = JobData()
nodes = [NodeData()] nodes = [NodeData()]
proc_seq = ProcSeq()
node_seq = NodeSeq()
...@@ -23,16 +23,19 @@ def fetch_data(query): ...@@ -23,16 +23,19 @@ def fetch_data(query):
# format metric # format metric
def format_metric(metrics_dict, metric, value): def format_metric(metrics_dict, metric, value):
type = metrics_dict[metric]["type"]
return format_value(type, value)
def format_value(fmt, value):
if value is None: if value is None:
return None return None
type = metrics_dict[metric]["type"] if fmt is metrics.MType.INT:
if type is metrics.MType.INT:
return int(float(value)) return int(float(value))
if type is metrics.MType.STR: if fmt is metrics.MType.STR:
return str(value) return str(value)
if type is metrics.MType.FLT: if fmt is metrics.MType.FLT:
return float(value) return float(value)
return None return None
# db/getinfluxdb.py gets metrics from the InfluxDB by jobID
# and returns parsed dictionary
import math
from collections import defaultdict
import conf.config as conf
from . import common
from . import metrics
m_proc = conf.measurements["proc"]
def get_query_per_node(aggr, metric, job_id, metric_db, measurement, delta, t_start, t_end):
aggr = '{:s}("{:s}") as "{:s}"'.format(aggr, metric_db, metric)
query = (
'SELECT {:s} FROM "{:s}" '
'WHERE "jobid1" = \'{:s}\' '
'AND time >= {:d}s AND time <= {:d}s '
'GROUP BY "host", time({:d}s);'
).format(aggr, measurement, job_id, t_start, t_end, delta)
return query
def get_query_sum_per_interval(metric, job_id, aggr, metric_db):
interval = conf.METRIC_INTERVAL
aggr_outer = '{:s}("{:s}_aggr") as "{:s}"'.format(aggr, metric, metric)
aggr_inner = 'sum("{:s}") as "{:s}_aggr"'.format(metric_db, metric)
query = (
'SELECT {:s} FROM ('
'SELECT {:s} '
'FROM "{:s}" WHERE "jobid1" = \'{:s}\' '
'GROUP BY "host", time({:d}s)) GROUP BY "host";'
).format(aggr_outer, aggr_inner, m_proc, job_id, interval)
return query
def get_query_sum_per_node(metric, job_id, metric_db, measurement, delta, t_start, t_end):
return get_query_per_node("sum", metric, job_id, metric_db, measurement, delta, t_start, t_end)
def get_query_avg_of_sum_per_interval(metric, job_id, metric_db):
return get_query_sum_per_interval(metric, job_id, "mean", metric_db)
def get_seq_queries(job_id, t_start, t_end, delta):
queries = ""
for m, par in metrics.metrics_seq.items():
if par["query"] is metrics.QType.SUM_PER_NODE:
q = get_query_sum_per_node(m, job_id, par["dbname"], par["measurement"], delta, t_start, t_end)
else:
raise ValueError("Unknown query type: {:s}".format(par["query"]))
queries += q
return queries
def format_seq_val(metric, val):
if metric not in metrics.metrics_seq:
return val
fmtd = common.format_value(metrics.metrics_seq[metric]["type"], val)
return fmtd
def parse_seq_response(data=None):
nodes = defaultdict(dict)
for i in range(0, len(data)):
if "series" not in data[i]:
raise RuntimeError("Some queries were not executed", data[i])
for s in range(0, len(data[i]['series'])):
serie = data[i]['series'][s]
metric = serie['columns'][1]
host = serie['tags']['host']
nodes[host][metric] = []
for v in range(0, len(serie['values'])):
nodes[host][metric].append(format_seq_val(metric, serie['values'][v][1]))
return nodes
def format_data(seqinfo_init):
seqinfo = defaultdict(dict)
for node, metric in seqinfo_init.items():
if metric not in metrics.metrics_seq:
continue
for val, vkey in enumerate(seqinfo_init[metric]):
value = common.format_value(metrics.metrics_seq[metric], val)
seqinfo[node][name][vkey] = value
return seqinfo
def get_seq_data(job_id, t_start, t_end):
delta = math.ceil((t_end - t_start) / conf.SEQ_MAX_POINTS)
delta = max(delta, conf.METRIC_INTERVAL)
query = get_seq_queries(job_id, t_start, t_end, delta)
seqdata_raw = common.fetch_data(query)
seqdata_parsed = parse_seq_response(seqdata_raw)
if conf.DEBUG:
print(query)
for host in seqdata_parsed:
for metric in seqdata_parsed[host]:
print(host, metric, len(seqdata_parsed[host][metric]), str(seqdata_parsed[host][metric])[:100])
return seqdata_parsed, delta
from .influxdb_fetchjob import get_job_data from .influxdb_fetchjob import get_job_data
from .influxdb_fetchproc import get_proc_data from .influxdb_fetchproc import get_proc_data
from .influxdb_fetchnode import get_node_data from .influxdb_fetchnode import get_node_data
from .influxdb_fetchseq import get_seq_data
from ..aggrstruct import * from ..aggrstruct import *
def fetch_all(job_id, type): def fetch_all(job_id, type):
...@@ -13,7 +14,9 @@ def fetch_all(job_id, type): ...@@ -13,7 +14,9 @@ def fetch_all(job_id, type):
data["node"] = get_node_data(node_ids) data["node"] = get_node_data(node_ids)
if type == "pdf": if type == "pdf":
""" fetch something moret """ t_start = data["job"]["start_time"]
t_end = data["job"]["end_time"]
data["seq"], data["seq_delta"] = get_seq_data(job_id, t_start, t_end)
return data return data
...@@ -68,6 +71,9 @@ def get_aggregator(job_id, type="ascii"): ...@@ -68,6 +71,9 @@ def get_aggregator(job_id, type="ascii"):
new_node.proc.mem_vms = procd["mem_vms"] new_node.proc.mem_vms = procd["mem_vms"]
new_node.proc.cpu_usage = procd["cpu_usage"] new_node.proc.cpu_usage = procd["cpu_usage"]
new_node.proc.seq_cpu_usage = SeqVals(data["seq_delta"], data["seq"][node_id]["proc_cpu_usage_sum"])
new_node.proc.seq_mem_rss_sum = SeqVals(data["seq_delta"], data["seq"][node_id]["proc_mem_rss_sum"])
aggr.nodes.append(new_node) aggr.nodes.append(new_node)
return aggr return aggr
from enum import Enum from enum import Enum
from conf import config as conf
class QType(Enum): class QType(Enum):
SUM_OF_MAX = 1 SUM_OF_MAX = 1
MAX_SUM_PER_INT = 2 MAX_SUM_PER_INT = 2
AVG_SUM_PER_INT = 3 AVG_SUM_PER_INT = 3
SUM_PER_NODE = 4
class MType(Enum): class MType(Enum):
...@@ -126,3 +127,18 @@ metrics_node = { ...@@ -126,3 +127,18 @@ metrics_node = {
"dbname": "alloc_cu", "dbname": "alloc_cu",
"type": MType.INT}, "type": MType.INT},
} }
metrics_seq = {
"proc_cpu_usage_sum": {
"dbname": "cpu_usage",
"type": MType.FLT,
"query": QType.SUM_PER_NODE,
"measurement": conf.measurements["proc"],
},
"proc_mem_rss_sum": {
"dbname": "memory_rss",
"type": MType.FLT,
"query": QType.SUM_PER_NODE,
"measurement": conf.measurements["proc"],
},
}
...@@ -52,14 +52,21 @@ def fill_scheme(agg, scheme, path, **ids): ...@@ -52,14 +52,21 @@ def fill_scheme(agg, scheme, path, **ids):
def format_scheme(agg, sstruc, path="", **ids): def format_scheme(agg, sstruc, path="", **ids):
if sstruc["type"] == "dict": if sstruc["type"] == "dict":
result = fill_scheme(agg, sstruc["scheme"], build_path(path, sstruc["path"]), **ids) result = fill_scheme(agg, sstruc["scheme"], build_path(path, sstruc["path"]), **ids)
elif sstruc["type"] == "list": elif sstruc["type"] == "list":
result = [] result = []
for fkey, _ in enumerate(get_value(agg, build_path(path, sstruc["path"]), **ids)): for fkey, _ in enumerate(get_value(agg, build_path(path, sstruc["path"]), **ids)):
ids = {sstruc["path"]: fkey} ids = {sstruc["path"]: fkey}
fres = fill_scheme(agg, sstruc["scheme"], build_path(path, sstruc["path"]), **ids) fres = fill_scheme(agg, sstruc["scheme"], build_path(path, sstruc["path"]), **ids)
result.append(fres) result.append(fres)
elif sstruc["type"] == "seq":
seq_data = get_value(agg, build_path(path, sstruc["path"]), **ids)
result = {
"delta": seq_data.delta,
"data": seq_data.seq,
}
return result return result
...@@ -74,7 +81,7 @@ def format_ascii(agg): ...@@ -74,7 +81,7 @@ def format_ascii(agg):
return result return result
def format_pdf(agg): def format_pdf(agg):
scheme = OutputSchemes.ascii scheme = OutputSchemes.pdf
result = {} result = {}
result = format_scheme(agg, scheme) result = format_scheme(agg, scheme)
......
...@@ -46,5 +46,69 @@ class OutputSchemes: ...@@ -46,5 +46,69 @@ class OutputSchemes:
} }
pdf = { pdf = {
"type": "dict",
"path": "",
"scheme": {
"job": {
"job_id": "job.id",
"user_name": "job.user_name",
"used_queue": "job.used_queue",
"submit_time": "job.submit_time",
"start_time": "job.start_time",
"end_time": "job.end_time",
"used_time": "job.run_time",
"requested_time": "job.requested_time",
"requested_cu": "job.requested_cu",
"num_nodes": "job.num_nodes",
},
"nodes": {
"type": "list",
"path": "nodes",
"scheme": {
"node_name": "name",
"cpu_time_user": "proc.cpu_time_user",
"cpu_time_system": "proc.cpu_time_system",
"cpu_time_idle": "proc.cpu_time_idle",
"cpu_time_iowait": "proc.cpu_time_iowait",
"write_bytes": "proc.write_bytes",
"write_count": "proc.write_count",
"read_bytes": "proc.read_bytes",
"read_count": "proc.read_count",
"mem_rss_max": "proc.mem_rss_max",
"mem_rss_avg": "proc.mem_rss_avg",
"mem_swap_max": "proc.mem_swap_max",
"mem_vms": "proc.mem_vms",
"cpu_usage": "proc.cpu_usage",
"cpu_model": "cpu_model",
"sockets": "sockets",
"cores_per_socket": "cores_per_socket",
"phys_threads_per_core": "phys_thr_core",
"virt_threads_per_core": "virt_thr_core",
"available_main_mem": "main_mem",
"num_cores": None,
"dynamic": {
"node_cpu_usage": {
"type": "seq",
"path": "seq_cpu_usage",
"scheme": "std"
},
"node_load": {
"type": "seq",
"path": "seq_load",
"scheme": "std"
},
"proc_cpu_usage": {
"type": "seq",
"path": "proc.seq_cpu_usage",
"scheme": "std"
},
"proc_mem_rss_sum": {
"type": "seq",
"path": "proc.seq_mem_rss_sum",
"scheme": "std"
},
}
}
}
}
} }
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment