Commit e3a1ca2a authored by Azat Khuziyakhmetov's avatar Azat Khuziyakhmetov
Browse files

implemented some rules for the recommendation system

parent 1fab9075
...@@ -55,13 +55,15 @@ class NodeData: ...@@ -55,13 +55,15 @@ class NodeData:
phys_thr_core = None phys_thr_core = None
main_mem = None main_mem = None
seq_cpu_usage = None seq_cpu_usage = None
seq_load = None seq_load_avg = None
seq_load_max = None
proc = None proc = None
alloc_cu = None alloc_cu = None
def __init__(self): def __init__(self):
self.proc = ProcData() self.proc = ProcData()
self.seq_load = SeqVals() self.seq_load_avg = SeqVals()
self.seq_load_max = SeqVals()
self.seq_cpu_usage = SeqVals() self.seq_cpu_usage = SeqVals()
......
...@@ -26,20 +26,20 @@ def get_query_aggr_per_node_for_job(aggr, metric, job_id, metric_db, measurement ...@@ -26,20 +26,20 @@ def get_query_aggr_per_node_for_job(aggr, metric, job_id, metric_db, measurement
return query return query
def get_query_aggr_for_nodes(aggr, metric, nodes, metric_db, measurement, delta, t_start, t_end): def get_query_aggr_for_nodes(gaggr, aggr, metric, nodes, metric_db, measurement, delta, t_start, t_end):
aggr = '{:s}("{:s}") as "{:s}"'.format(aggr, metric_db, metric) aggr = '{:s}("{:s}") as "{:s}"'.format(aggr, metric_db, metric)
node_cond = ' OR '.join("host='{:s}'".format(n) for n in nodes) node_cond = ' OR '.join("host='{:s}'".format(n) for n in nodes)
query = ( query = (
'SELECT mean({met}) as "{met}" FROM (' 'SELECT {gagr}({met}) as "{met}" FROM ('
'SELECT {agr} FROM "{mes}" ' 'SELECT {agr} FROM "{mes}" '
'WHERE ({nodes}) ' 'WHERE ({nodes}) '
'AND time >= {start}s AND time <= {end}s ' 'AND time >= {start}s AND time <= {end}s '
'GROUP BY "host", time({dbdelta}s)) ' 'GROUP BY "host", time({dbdelta}s)) '
'WHERE time >= {start}s AND time <= {end}s ' 'WHERE time >= {start}s AND time <= {end}s '
'GROUP BY "host", time({delta}s);' 'GROUP BY "host", time({delta}s);'
).format(agr = aggr, mes = measurement, nodes = node_cond, ).format(gagr = gaggr, agr = aggr, mes = measurement, nodes = node_cond,
start = t_start, end = t_end, delta = delta, met = metric, start = t_start, end = t_end, delta = delta, met = metric,
dbdelta = DB_INTERVAL) dbdelta = DB_INTERVAL)
...@@ -51,7 +51,10 @@ def get_query_sum_per_node_of_job(metric, job_id, metric_db, measurement, delta, ...@@ -51,7 +51,10 @@ def get_query_sum_per_node_of_job(metric, job_id, metric_db, measurement, delta,
def get_query_avg_of_node(metric, nodes, metric_db, measurement, delta, t_start, t_end): def get_query_avg_of_node(metric, nodes, metric_db, measurement, delta, t_start, t_end):
return get_query_aggr_for_nodes("mean", metric, nodes, metric_db, measurement, delta, t_start, t_end) return get_query_aggr_for_nodes("mean", "mean", metric, nodes, metric_db, measurement, delta, t_start, t_end)
def get_query_max_of_node(metric, nodes, metric_db, measurement, delta, t_start, t_end):
return get_query_aggr_for_nodes("max", "max", metric, nodes, metric_db, measurement, delta, t_start, t_end)
def get_seq_queries(job_id, t_start, t_end, delta, nodes): def get_seq_queries(job_id, t_start, t_end, delta, nodes):
...@@ -64,6 +67,9 @@ def get_seq_queries(job_id, t_start, t_end, delta, nodes): ...@@ -64,6 +67,9 @@ def get_seq_queries(job_id, t_start, t_end, delta, nodes):
elif par["query"] is metrics.QType.AVG_OF_NODE: elif par["query"] is metrics.QType.AVG_OF_NODE:
q = get_query_avg_of_node( q = get_query_avg_of_node(
m, nodes, par["dbname"], par["measurement"], delta, t_start, t_end) m, nodes, par["dbname"], par["measurement"], delta, t_start, t_end)
elif par["query"] is metrics.QType.MAX_OF_NODE:
q = get_query_max_of_node(
m, nodes, par["dbname"], par["measurement"], delta, t_start, t_end)
else: else:
raise ValueError("Unknown query type: {:s}".format(par["query"])) raise ValueError("Unknown query type: {:s}".format(par["query"]))
......
...@@ -78,7 +78,8 @@ def get_aggregator(job_id, type="ascii"): ...@@ -78,7 +78,8 @@ def get_aggregator(job_id, type="ascii"):
new_node.seq_cpu_usage = SeqVals(data["seq_delta"], data["seq"][node_id]["node_cpu_idle_avg"]) new_node.seq_cpu_usage = SeqVals(data["seq_delta"], data["seq"][node_id]["node_cpu_idle_avg"])
new_node.seq_cpu_usage.seq = [None if x is None else (100 - x) for x in new_node.seq_cpu_usage.seq] new_node.seq_cpu_usage.seq = [None if x is None else (100 - x) for x in new_node.seq_cpu_usage.seq]
new_node.seq_load = SeqVals(data["seq_delta"], data["seq"][node_id]["node_load_avg"]) new_node.seq_load_avg = SeqVals(data["seq_delta"], data["seq"][node_id]["node_load_avg"])
new_node.seq_load_max = SeqVals(data["seq_delta"], data["seq"][node_id]["node_load_max"])
aggr.nodes.append(new_node) aggr.nodes.append(new_node)
......
...@@ -7,6 +7,7 @@ class QType(Enum): ...@@ -7,6 +7,7 @@ class QType(Enum):
AVG_SUM_PER_INT = 3 AVG_SUM_PER_INT = 3
SUM_PER_NODE = 4 SUM_PER_NODE = 4
AVG_OF_NODE = 5 AVG_OF_NODE = 5
MAX_OF_NODE = 6
class MType(Enum): class MType(Enum):
...@@ -160,4 +161,10 @@ metrics_seq = { ...@@ -160,4 +161,10 @@ metrics_seq = {
"query": QType.AVG_OF_NODE, "query": QType.AVG_OF_NODE,
"measurement": conf.measurements["sys"], "measurement": conf.measurements["sys"],
}, },
"node_load_max": {
"dbname": "load1",
"type": MType.FLT,
"query": QType.MAX_OF_NODE,
"measurement": conf.measurements["sys"],
},
} }
...@@ -94,7 +94,7 @@ class OutputSchemes: ...@@ -94,7 +94,7 @@ class OutputSchemes:
}, },
"node_load": { "node_load": {
"type": "seq", "type": "seq",
"path": "seq_load", "path": "seq_load_avg",
"scheme": "std" "scheme": "std"
}, },
"proc_cpu_usage": { "proc_cpu_usage": {
......
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
# each attribute should have finite number of values # each attribute should have finite number of values
from db.aggrstruct import * from db.aggrstruct import *
from . import helpers
DEBUG = False DEBUG = False
...@@ -17,7 +18,7 @@ def rcmattr(func): ...@@ -17,7 +18,7 @@ def rcmattr(func):
raise TypeError("argument of attribute should be Aggregator") raise TypeError("argument of attribute should be Aggregator")
printd(" - evaluation of the attribute {:s} - ".format(func.__name__)) printd(" - evaluation of the attribute {:s} - ".format(func.__name__))
res = func(Aggr, *args, **kwargs) res = func(Aggr, *args, **kwargs)
printd(" - finished the attribute {:s} - ".format(func.__name__)) printd(" - finished the attribute {:s} ({}) - ".format(func.__name__, res))
return res return res
return wrapper return wrapper
...@@ -25,6 +26,9 @@ class Attributes: ...@@ -25,6 +26,9 @@ class Attributes:
@rcmattr @rcmattr
def cpu_usage(Aggr): def cpu_usage(Aggr):
CPUDIFF_HIGH = 1
CPUDIFF_LOW = 4
cpu_usage = 0 cpu_usage = 0
cpu_usage_sum = 0 cpu_usage_sum = 0
...@@ -33,15 +37,103 @@ class Attributes: ...@@ -33,15 +37,103 @@ class Attributes:
for node in Aggr.nodes: for node in Aggr.nodes:
cpu_usage_sum += node.proc.cpu_usage cpu_usage_sum += node.proc.cpu_usage
cpu_fact = cpu_usage_sum / cpu_requested / 100 cpu_ratio = cpu_usage_sum / cpu_requested / 100
cpu_diff = abs(cpu_usage_sum / 100 - cpu_requested)
printd("{} = {} (total cpu_usage) / {} (requested cores) / {}". printd("{} = {} (total cpu_usage) / {} (requested cores) / {}".
format(cpu_fact, cpu_usage_sum, cpu_requested, 100)) format(cpu_ratio, cpu_usage_sum, cpu_requested, 100))
res = "MED" res = "MED"
if cpu_fact > 1.1: if cpu_ratio > 1 and cpu_diff >= CPUDIFF_HIGH:
res = "HIGH" res = "HIGH"
elif cpu_fact < 0.7: elif cpu_ratio < 1 and cpu_diff > CPUDIFF_LOW:
res = "LOW" res = "LOW"
return res return res
@rcmattr
def mem_usage_total(Aggr):
MEMDIFF_HIGH = 1024*1024*1024
mem_req_total = 0
mem_used_max = 0
for node in Aggr.nodes:
mem_per_core = node.main_mem / (node.sockets * node.cores_per_socket)
mem_req_total += mem_per_core * node.alloc_cu
mem_used_max += node.proc.mem_rss_max
mem_ratio = mem_used_max / mem_req_total
mem_diff = abs(mem_used_max - mem_req_total)
printd("mem total = {}, mem used {}, ratio {}".
format(mem_req_total, mem_used_max, mem_ratio))
res = "MED"
if mem_ratio > 1 and mem_diff > MEMDIFF_HIGH:
res = "HIGH"
elif mem_ratio < 0.5 and mem_diff > MEMDIFF_HIGH:
res = "LOW"
return res
@rcmattr
def max_proc_ratio(Aggr):
max_proc_ratio = 0
for node in Aggr.nodes:
node_proc_ratio = (node.proc.cpu_usage / 100) / node.alloc_cu
max_proc_ratio = max(max_proc_ratio, node_proc_ratio)
res = "MED"
if max_proc_ratio > 1.1:
res = "HIGH"
elif max_proc_ratio < 0.9:
res = "LOW"
return res
@rcmattr
def min_proc_ratio(Aggr):
min_proc_ratio = 1
for node in Aggr.nodes:
node_proc_ratio = (node.proc.cpu_usage / 100) / node.alloc_cu
min_proc_ratio = min(min_proc_ratio, node_proc_ratio)
printd("min ratio = {}".format(min_proc_ratio))
res = "MED"
if min_proc_ratio > 1.1:
res = "HIGH"
elif min_proc_ratio == 0:
res = "ZERO"
elif min_proc_ratio < 0.9:
res = "LOW"
return res
@rcmattr
def duration_ratio(Aggr):
dur_ratio = Aggr.job.run_time / Aggr.job.requested_time
res = "UNKNOWN"
if dur_ratio > 0.9:
res = "HIGH"
elif dur_ratio > 0.5:
res = "ZERO"
else:
res = "LOW"
return res
@rcmattr
def overloaded_node_exists(Aggr):
exists = False
for node in Aggr.nodes:
if node.proc.cpu_usage > 90:
continue
if helpers.node_has_high_load_interval(node):
exists = True
break
return exists
import sys
import math
from db.aggrstruct import *
LONG_DUR_SEC = 60
def accepts(*types, **kw):
'''Function decorator. Checks decorated function's arguments are
of the expected types.
Parameters:
types -- The expected types of the inputs to the decorated function.
Must specify type for each parameter.
kw -- Optional specification of 'debug' level (this is the only valid
keyword argument, no other should be given).
debug = ( 0 | 1 | 2 )
'''
if not kw:
# default level: MEDIUM
debug = 1
else:
debug = kw['debug']
try:
def decorator(f):
def newf(*args):
if debug is 0:
return f(*args)
assert len(args) == len(types)
argtypes = tuple(map(type, args))
if argtypes != types:
msg = info(f.__name__, types, argtypes, 0)
if debug is 1:
print >> sys.stderr, 'TypeWarning: ', msg
elif debug is 2:
raise TypeError(msg)
return f(*args)
newf.__name__ = f.__name__
return newf
return decorator
except KeyError:
key = KeyError.args
raise KeyError(key + "is not a valid keyword argument")
except TypeError:
msg = TypeError.args
raise TypeError(msg)
@accepts(NodeData, int)
def node_has_high_load_interval(node):
has_high_load = False
conseq_points = 0
max_points = ceil(SeqVals.delta / LONG_DUR_SEC)
max_load = node.sockets * node.cores_per_socket
seq = node.seq_load_max
for p in seq.seq:
if conseq_points >= max_points:
has_high_load = True
break
if p is None: continue
if p >= max_load:
conseq_points += 1
else:
conseq_points = 0
return has_high_load
@accepts(Aggregator)
def has_high_load_interval(aggr):
has_high_load = False
for node in Aggr.nodes:
if node_has_high_load_interval(node, node_max_load):
has_high_load = True
break
return has_high_load
...@@ -11,7 +11,13 @@ def attributes_match(attrs, Aggr): ...@@ -11,7 +11,13 @@ def attributes_match(attrs, Aggr):
except AttributeError: except AttributeError:
raise AttributeError("there is no attribute " + rule_attr) raise AttributeError("there is no attribute " + rule_attr)
if attr_func(Aggr) != attr_value: expected_vals = []
if isinstance(attr_value, list):
expected_vals.extend(attr_value)
else:
expected_vals.append(attr_value)
if attr_func(Aggr) not in expected_vals:
matches = False matches = False
break break
......
RULES = [ RULES = [
{"attrs": {"cpu_usage": "HIGH"}, "msg": "request more compute units"}, {"attrs": {"cpu_usage": "HIGH"}, "msg": "request more compute units"},
{"attrs": {"cpu_usage": "LOW"}, "msg": "request less compute units"} {"attrs": {"cpu_usage": "LOW", "mem_usage_total": "LOW"}, "msg": "request less compute units"},
{"attrs": {"mem_usage_total": "HIGH"}, "msg": "Maximum memory usage exceeds requested amount"},
{"attrs": {"min_proc_ratio": "HIGH"}, "msg": "All nodes have more processes than requested"},
{"attrs": {"min_proc_ratio": "ZERO"}, "msg": "Some nodes were not used at all"},
{"attrs": {"overloaded_node_exists": True}, "msg": "Some nodes are overloaded. Probably by other processes"},
{"attrs": {"duration_ratio": "LOW"}, "msg": "The requested walltime is too hight. Try to request less time"},
] ]
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment