Commit b133e0d3 authored by Azat Khuziyakhmetov's avatar Azat Khuziyakhmetov
Browse files

Separation of Fetching, Aggregation, Manipulation and Formatting parts.

parent 1ba8da6c
from .influx import influxdb_wrapper
def get_aggregator(job_id, type = "ascii"):
aggr = influxdb_wrapper.get_aggregator(job_id, type)
return aggr
class ProcData:
cpu_time_user = None
cpu_time_system = None
cpu_time_idle = None
cpu_time_iowait = None
write_bytes = None
write_count = None
read_bytes = None
read_count = None
mem_rss_max = None
mem_rss_avg = None
mem_swap_max = None
mem_vms = None
cpu_usage = None
class AllocCUData:
node_id = None
cu_count = None
def __init__(self, node=None, cu=None):
self.node_id = node
self.cu_count = cu
class JobData:
id = None
user_name = None
used_queue = None
submit_time = None
start_time = None
end_time = None
run_time = None
requested_time = None
requested_cu = None
num_nodes = None
alloc_cu = [AllocCUData()]
class NodeData:
name = None
cpu_model = None
sockets = None
cores_per_socket = None
virt_thr_core = None
phys_thr_core = None
main_mem = None
proc = ProcData()
class ProcSeq:
cpu_time_user_avg = []
cpu_time_sys_avg = []
mem_rss_avg = []
class NodeSeq:
cpu_usage = []
mem_usage = []
class Aggregator:
job = JobData()
nodes = [NodeData()]
proc_seq = ProcSeq()
node_seq = NodeSeq()
......@@ -48,7 +48,7 @@ def format_job_info(jobinfo_init):
for name, par in metrics.metrics_job.items():
raw_value = jobinfo_init.get("last_{:s}".format(par["dbname"]), None)
value = common.format_metric(metrics.metrics_job, name, raw_value)
jobinfo[par["apiname"]] = value
jobinfo[name] = value
return jobinfo
......
......@@ -14,7 +14,6 @@ def get_node_query(node_id):
'SELECT last({:s}), * FROM "{:s}" '
'WHERE "host" = \'{:s}\' GROUP BY "host";'
).format(random_field, m_node, node_id)
return query
def get_node_queries(nodes):
......@@ -49,12 +48,10 @@ def format_node_info(nodeinfo_init):
for name, par in metrics.metrics_node.items():
raw_value = nodeinfo_init[n].get(par["dbname"], None)
value = common.format_metric(metrics.metrics_node, name, raw_value)
nodeinfo[n][par["apiname"]] = value
nodeinfo[n][name] = value
phname = metrics.metrics_node["phys_thr_core"]["apiname"]
vrname = metrics.metrics_node["virt_thr_core"]["apiname"]
nodeinfo[n][phname] = 1
nodeinfo[n][vrname] -= 1
nodeinfo[n]["phys_thr_core"] = 1
nodeinfo[n]["virt_thr_core"] -= 1
return nodeinfo
......
......@@ -84,7 +84,7 @@ def format_data(procinfo_init):
for name, par in metrics.metrics_proc.items():
raw_value = vars.get(name, None)
value = common.format_metric(metrics.metrics_proc, name, raw_value)
procinfo[node][par["apiname"]] = value
procinfo[node][name] = value
return procinfo
......
from .influxdb_fetchjob import get_job_data
from .influxdb_fetchproc import get_proc_data
from .influxdb_fetchnode import get_node_data
from ..data import *
def fetch_all(job_id, type):
data = {}
data["job"], data["alloc_cu"] = get_job_data(job_id)
data["proc"] = get_proc_data(job_id)
node_ids = list(data["proc"].keys())
data["node"] = get_node_data(node_ids)
if type == "pdf":
""" fetch something moret """
return data
def get_aggregator(job_id, type="ascii"):
data = fetch_all(job_id, type)
aggr = Aggregator()
jd = data["job"]
cud = data["alloc_cu"]
aggr.job.id = job_id
aggr.job.user_name = jd["user_name"]
aggr.job.used_queue = jd["used_queue"]
aggr.job.submit_time = jd["submit_time"]
aggr.job.start_time = jd["start_time"]
aggr.job.end_time = jd["end_time"]
aggr.job.run_time = jd["run_time"]
aggr.job.requested_time = jd["requested_time"]
aggr.job.requested_cu = jd["requested_cu"]
aggr.job.num_nodes = jd["num_nodes"]
aggr.job.alloc_cu = []
for node_id, cu_count in cud.items():
aggr.job.alloc_cu.append(AllocCUData(node_id, cu_count))
aggr.nodes = []
for node_id, noded in data["node"].items():
new_node = NodeData()
new_node.name = node_id
new_node.cpu_model = noded["cpu_model"]
new_node.sockets = noded["sockets"]
new_node.cores_per_socket = noded["cores_per_socket"]
new_node.virt_thr_core = noded["virt_thr_core"]
new_node.phys_thr_core = noded["phys_thr_core"]
new_node.main_mem = noded["main_mem"]
procd = data["proc"][node_id]
new_node.proc.cpu_time_user = procd["cpu_time_user"]
new_node.proc.cpu_time_system = procd["cpu_time_system"]
new_node.proc.cpu_time_idle = procd["cpu_time_idle"]
new_node.proc.cpu_time_iowait = procd["cpu_time_iowait"]
new_node.proc.write_bytes = procd["write_bytes"]
new_node.proc.write_count = procd["write_count"]
new_node.proc.read_bytes = procd["read_bytes"]
new_node.proc.read_count = procd["read_count"]
new_node.proc.mem_rss_max = procd["mem_rss_max"]
new_node.proc.mem_rss_avg = procd["mem_rss_avg"]
new_node.proc.mem_swap_max = procd["mem_swap_max"]
new_node.proc.mem_vms = procd["mem_vms"]
new_node.proc.cpu_usage = procd["cpu_usage"]
aggr.nodes.append(new_node)
return aggr
......@@ -17,67 +17,54 @@ class APInames():
metrics_proc = {
'cpu_time_user': {
"apiname": 'cpu_time_user',
"dbname": 'cpu_time_user',
"query": QType.SUM_OF_MAX,
"type": MType.INT},
'cpu_time_system': {
"apiname": 'cpu_time_system',
"dbname": 'cpu_time_system',
"query": QType.SUM_OF_MAX,
"type": MType.INT},
'cpu_time_idle': {
"apiname": 'cpu_time_idle',
"dbname": 'cpu_time_idle',
"query": QType.SUM_OF_MAX,
"type": MType.INT},
'cpu_time_iowait': {
"apiname": 'cpu_time_iowait',
"dbname": 'cpu_time_iowait',
"query": QType.SUM_OF_MAX,
"type": MType.INT},
'write_bytes': {
"apiname": 'write_bytes',
"dbname": 'write_bytes',
"query": QType.SUM_OF_MAX,
"type": MType.INT},
'write_count': {
"apiname": 'write_count',
"dbname": 'write_count',
"query": QType.SUM_OF_MAX,
"type": MType.INT},
'read_bytes': {
"apiname": 'read_bytes',
"dbname": 'read_bytes',
"query": QType.SUM_OF_MAX,
"type": MType.INT},
'read_count': {
"apiname": 'read_count',
"dbname": 'read_count',
"query": QType.SUM_OF_MAX,
"type": MType.INT},
'mem_rss_max': {
"apiname": 'mem_rss_max',
"dbname": 'memory_rss',
"query": QType.MAX_SUM_PER_INT,
"type": MType.INT},
'mem_rss_avg': {
"apiname": 'mem_rss_avg',
"dbname": 'memory_rss',
"query": QType.AVG_SUM_PER_INT,
"type": MType.INT},
'mem_swap_max': {
"apiname": 'mem_swap_max',
"dbname": 'memory_swap',
"query": QType.MAX_SUM_PER_INT,
"type": MType.INT},
'mem_vms': {
"apiname": 'mem_vms',
"dbname": 'memory_vms',
"query": QType.MAX_SUM_PER_INT,
"type": MType.INT},
'cpu_usage': {
"apiname": 'cpu_usage',
"dbname": 'cpu_usage',
"query": QType.MAX_SUM_PER_INT,
"type": MType.INT},
......@@ -85,74 +72,57 @@ metrics_proc = {
metrics_job = {
"user_name": {
"apiname": "user_name",
"dbname": "user_name",
"type": MType.STR},
"used_queue": {
"apiname": "used_queue",
"dbname": "used_queue",
"type": MType.STR},
"submit_time": {
"apiname": "submit_time",
"dbname": "submit_time",
"type": MType.INT},
"start_time": {
"apiname": "start_time",
"dbname": "start_time",
"type": MType.INT},
"end_time": {
"apiname": "end_time",
"dbname": "end_time",
"type": MType.INT},
"run_time": {
"apiname": "used_time",
"dbname": "run_time",
"type": MType.INT},
"requested_time": {
"apiname": "requested_time",
"dbname": "requested_time",
"type": MType.INT},
"requested_cu": {
"apiname": "requested_cu",
"dbname": "requested_cu",
"type": MType.INT},
"num_nodes": {
"apiname": "num_nodes",
"dbname": "num_nodes",
"type": MType.INT},
"alloc_cu": {
"apiname": "alloc_cu",
"dbname": "alloc_cu",
"type": MType.STR},
}
metrics_node = {
"cpu_model": {
"apiname": "cpu_model",
"dbname": "cpu_model",
"type": MType.STR},
"sockets": {
"apiname": "sockets",
"dbname": "sockets",
"type": MType.INT},
"cores_per_socket": {
"apiname": "cores_per_socket",
"dbname": "cores_per_socket",
"type": MType.INT},
"phys_thr_core": {
"apiname": "phys_threads_per_core",
"dbname": "threads_per_core",
"type": MType.INT},
"virt_thr_core": {
"apiname": "virt_threads_per_core",
"dbname": "threads_per_core",
"type": MType.INT},
"main_mem": {
"apiname": "available_main_mem",
"dbname": "main_mem",
"type": MType.INT},
"alloc_cu": {
"apiname": "num_cores",
"dbname": "alloc_cu",
"type": MType.INT},
}
# this script contains the main function format to format the data ready to
# output
from db import data
from .formats_json import OutputSchemes
def get_value(agg, path, **ids):
if path == "":
return agg
classes = path.split(".")
recvar = agg
for c in classes:
if not hasattr(recvar, c):
raise ValueError("scheme: {} from {} was not found".format(c, path))
recvar = getattr(recvar, c)
if c in ids:
recvar = recvar[ids[c]]
return recvar
def build_path(prefix, suffix):
path = suffix
if prefix != "":
path = "{}.{}".format(prefix, suffix)
return path
def fill_scheme(agg, scheme, path, **ids):
# fill all standard fields via simple copy
result = {}
for fkey, fval in scheme.items():
if fval is None:
continue
if type(fval) is dict:
if "scheme" in fval and "path" in fval:
fres = format_scheme(agg, fval, path, **ids)
else:
fres = fill_scheme(agg, fval, path, **ids)
if type(fval) is str:
fres = get_value(agg, build_path(path, fval), **ids)
result[fkey] = fres
return result
def format_scheme(agg, sstruc, path="", **ids):
if sstruc["type"] == "dict":
result = fill_scheme(agg, sstruc["scheme"], build_path(path, sstruc["path"]), **ids)
elif sstruc["type"] == "list":
result = []
for fkey, _ in enumerate(get_value(agg, build_path(path, sstruc["path"]), **ids)):
ids = {sstruc["path"]: fkey}
fres = fill_scheme(agg, sstruc["scheme"], build_path(path, sstruc["path"]), **ids)
result.append(fres)
return result
def format_ascii(agg):
scheme = OutputSchemes.ascii
result = {}
result = format_scheme(agg, scheme)
result = format_acsii_nonstd(agg, result)
return result
def format(agg, type="ascii"):
if type(agg) is not Aggregator:
raise RuntimeError("No aggregator in the format method")
if type == "ascii":
format_ascii(agg)
elif type == "pdf":
format_pdf(agg)
def format_acsii_nonstd(agg, result):
# TODO: format alloc_cu
return result
class OutputSchemes:
ascii = {
"type": "dict",
"path": "",
"scheme": {
"job_id": "job.id",
"general": {
"user_name": "job.user_name",
"used_queue": "job.used_queue",
"submit_time": "job.submit_time",
"start_time": "job.start_time",
"end_time": "job.end_time",
"used_time": "job.run_time",
"requested_time": "job.requested_time",
"requested_cu": "job.requested_cu",
"num_nodes": "job.num_nodes",
},
"nodes": {
"type": "list",
"path": "nodes",
"scheme": {
"node_name": "name",
"cpu_time_user": "proc.cpu_time_user",
"cpu_time_system": "proc.cpu_time_system",
"cpu_time_idle": "proc.cpu_time_idle",
"cpu_time_iowait": "proc.cpu_time_iowait",
"write_bytes": "proc.write_bytes",
"write_count": "proc.write_count",
"read_bytes": "proc.read_bytes",
"read_count": "proc.read_count",
"mem_rss_max": "proc.mem_rss_max",
"mem_rss_avg": "proc.mem_rss_avg",
"mem_swap_max": "proc.mem_swap_max",
"mem_vms": "proc.mem_vms",
"cpu_usage": "proc.cpu_usage",
"cpu_model": "cpu_model",
"sockets": "sockets",
"cores_per_socket": "cores_per_socket",
"phys_threads_per_core": None,
"virt_threads_per_core": None,
"available_main_mem": "main_mem",
"num_cores": None,
}
}
}
}
pdf = {
}
from pprint import pprint
from db import data
from db import aggregator
from format import format
agg = data.Aggregator()
# fill some random data
agg.job = data.JobData()
for f in dir(agg.job):
if not f.startswith('__') and not callable(getattr(agg.job,f)):
setattr(agg.job, f, "job_field_" + f + "_value")
agg.nodes = [data.NodeData(), data.NodeData()]
for i, _ in enumerate(agg.nodes):
agg.nodes[i].proc = data.ProcData()
cls = agg.nodes[i].proc
for f in dir(cls):
if not f.startswith('__') and not callable(getattr(cls,f)) and getattr(cls, f) is None:
setattr(agg.nodes[i].proc, f, "proc_" + str(i) + "_" + str(f) + "_value")
cls = agg.nodes[i]
for f in dir(cls):
if not f.startswith('__') and not callable(getattr(cls,f)) and getattr(cls, f) is None:
setattr(agg.nodes[i], f, "node_" + str(i) + "_" + str(f) + "_value")
# format and output JSON
pprint(format.format_ascii(agg))
aggr = aggregator.get_aggregator("2368599", "ascii")
pprint(format.format_ascii(aggr))
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment