Commit 07f13cb9 authored by akhuziy's avatar akhuziy
Browse files

added GPU data aggregation

parent 559b4ebc
......@@ -36,7 +36,12 @@ BATCH_SYSTEM = "SLURM" # LSF, SLURM
# Network
INFINIBAND = True # aggregate infiniband data
# Filesystems
BEEGFS = True # aggregate beegfs data
# GPU
GPU = True # aggregat gpu data
# PDF report
SEQ_MAX_POINTS = 500
......@@ -30,6 +30,27 @@ class ProcData:
self.seq_write_sum = SeqVals()
self.seq_read_sum = SeqVals()
class GPUData:
bus = None
power_limit = None
mem = None
mem_max = None
temp_max = None
power_max = None
usage_max = None
cpu_usage_max = None
cpu_mem_rss_max = None
cpu_proc_total = None
def __init__(self):
self.seq_mem_avg = SeqVals()
self.seq_usage_avg = SeqVals()
self.seq_temp_avg = SeqVals()
self.seq_power_avg = SeqVals()
self.seq_cpu_usage_sum = SeqVals()
self.seq_cpu_mem_rss_sum = SeqVals()
self.seq_cpu_proc_count = SeqVals()
class AllocCUData:
node_id = None
cu_count = None
......@@ -72,6 +93,7 @@ class NodeData:
def __init__(self):
self.proc = ProcData()
self.gpus = [GPUData()]
self.seq_load_avg = SeqVals()
self.seq_load_max = SeqVals()
self.seq_cpu_usage = SeqVals()
......
# db/common.py common functions
import requests
import json
from requests.auth import HTTPBasicAuth
from conf import influxdb as confdb
from . import metrics
......@@ -14,7 +15,9 @@ def fetch_data(query):
# not only status code 200 gives us results, so we need to find out which are the bad/good status codes.
if r.status_code != 200:
raise RuntimeError(
"Couldn't retrieve the data from InfluxDB. Code:", r.status_code)
"Couldn't retrieve the data from InfluxDB. Code: {}, Response: {}".format(
r.status_code, json.dumps(r.json()))
)
result = r.json()['results']
......
# this script fetches everything related to GPU of the nodes
# and returns parsed dictionary
import math
from collections import defaultdict
import conf.config as conf
from . import common
from . import metrics
DB_INTERVAL = conf.METRIC_INTERVAL
DB_GPU_MES = conf.measurements["gpu_node"]
DB_GPU_PROC_MES = conf.measurements["gpu_proc"]
def get_query_used_gpus(job_id):
mes = DB_GPU_PROC_MES
random_field = metrics.metrics_gpu["gpu_cpu_usage_max"]["dbname"]
query = (
'SELECT last({field}) as "{field}" FROM "{m}" '
'WHERE ("JOBID" = \'{jobid}\') '
'GROUP BY "host", "bus";'
).format(m = mes, jobid = job_id, field = random_field)
return query
def get_query_proc_count_per_device_interval_seq(metric, t_start, t_end, delta, job_id):
mes = DB_GPU_PROC_MES
query = (
'SELECT count(distinct("pid")) as "{met}" FROM "{m}" '
'WHERE ("JOBID" = \'{jobid}\') '
'AND time >= {ts:d}s AND time <= {te:d}s '
'GROUP BY "host", "bus", time({i:d}s);'
).format(m = mes, jobid = job_id, i = delta, met = metric,
ts = t_start, te = t_end
)
return query
def get_query_proc_count_per_device_total(metric, job_id):
mes = DB_GPU_PROC_MES
query = (
'SELECT count(distinct("pid")) as "{met}" FROM "{m}" '
'WHERE ("JOBID" = \'{jobid}\') '
'GROUP BY "host", "bus";'
).format(m = mes, jobid = job_id, met = metric)
return query
def format_gpu_node_list(gpus, gpu_bind = True):
final_cond = ""
if gpu_bind is False:
final_cond = ' OR '.join("host={:s}".format(n) for n in gpus)
else:
nodes_fmtd = []
for node, buses in gpus.items():
buses_fmtd = ' OR '.join("bus='{:s}'".format(b) for b in buses)
nodes_fmtd.append("host='{node}' AND ({buses})".format(node = node, buses = buses_fmtd))
final_cond = ' OR '.join("({:s})".format(n) for n in nodes_fmtd)
return final_cond
def get_query_gpu_last(metric, metric_db, t_start, t_end, gpus):
node_cond = format_gpu_node_list(gpus)
mes = DB_GPU_MES
query = (
'SELECT last("{metdb}") as "{met}" FROM "{mes}" '
'WHERE ({nodes}) '
'AND time >= {ts:d}s AND time <= {te:d}s '
'GROUP BY "host", "bus";'
).format(met = metric, metdb = metric_db, nodes = node_cond, mes = mes,
ts = t_start, te = t_end
)
return query
def get_query_aggr_per_device(metric, metric_db, t_start, t_end, gpus, aggr):
aggr_fmtd = '{:s}("{:s}") as "{:s}"'.format(aggr, metric_db, metric)
node_cond = format_gpu_node_list(gpus)
mes = DB_GPU_MES
query = (
'SELECT {aggr:s} FROM "{mes:s}" '
'WHERE ({nodes}) '
'AND time >= {ts:d}s AND time <= {te:d}s '
'GROUP BY "host", "bus";'
).format(aggr = aggr_fmtd, mes = mes,
nodes = node_cond, ts = t_start, te = t_end
)
return query
def get_query_aggr_per_device_seq(metric, metric_db, t_start, t_end, delta, gpus, aggr):
aggr_fmtd = '{:s}("{:s}") as "{:s}"'.format(aggr, metric_db, metric)
node_cond = format_gpu_node_list(gpus)
mes = DB_GPU_MES
query = (
'SELECT {aggr} FROM "{mes}" '
'WHERE ({nodes}) '
'AND time >= {ts:d}s AND time <= {te:d}s '
'GROUP BY "host", "bus", time({delta}s);'
).format(aggr = aggr_fmtd, mes = mes, nodes = node_cond,
ts = t_start, te = t_end, delta = delta
)
return query
def get_query_proc_aggr_per_bus_for_job(metric, metric_db, t_start, t_end, delta, job_id, aggr_in, aggr_out):
mes = DB_GPU_PROC_MES
query = (
'SELECT {agr_out}("{met}") as "{met}" FROM ('
'SELECT {agr_in}("{mdb}") as "{met}" FROM "{mes}" '
'WHERE "JOBID" = \'{job}\' '
'AND time >= {ts:d}s AND time <= {te:d}s '
'GROUP BY "host", "bus", time({dbdelta}s)) '
'WHERE time >= {ts:d}s AND time <= {te:d}s '
'GROUP BY "host", "bus";'
).format(agr_out = aggr_out, agr_in = aggr_in, mes = mes, job = job_id,
ts = t_start, te = t_end, delta = delta, met = metric,
dbdelta = DB_INTERVAL, mdb = metric_db)
return query
def get_query_proc_aggr_per_bus_for_job_seq(metric, metric_db, t_start, t_end, delta, job_id, aggr):
aggr = '{:s}("{:s}") as "{:s}"'.format(aggr, metric_db, metric)
mes = DB_GPU_PROC_MES
query = (
'SELECT mean("{met}") as "{met}" FROM ('
'SELECT {agr} FROM "{mes}" '
'WHERE "JOBID" = \'{job}\' '
'AND time >= {ts:d}s AND time <= {te:d}s '
'GROUP BY "host", "bus", time({dbdelta}s)) '
'WHERE time >= {ts:d}s AND time <= {te:d}s '
'GROUP BY "host", "bus", time({delta}s);'
).format(agr = aggr, mes = mes, job = job_id,
ts = t_start, te = t_end, delta = delta, met = metric,
dbdelta = DB_INTERVAL)
return query
def parse_gpu_used(data):
nodes = defaultdict(dict)
if "series" not in data[0]:
if "statement_id" in data[0]:
if conf.DEBUG:
print("Could not execute query to get used GPUs", queries[sid])
else:
raise RuntimeError("Cannot parse the output of GPU count query", data[i])
for s in range(0, len(data[0]['series'])):
bus = data[0]['series'][s]['tags']['bus']
host = data[0]['series'][s]['tags']['host']
if host in nodes:
nodes[host].append(bus)
else:
nodes[host] = [bus]
return nodes
def get_gpu_used_nodes(job_id):
query = get_query_used_gpus(job_id)
gpu_used_raw = common.fetch_data(query)
nodes = parse_gpu_used(gpu_used_raw)
return nodes
def get_gpu_queries(job_id, t_start, t_end, gpus, delta):
queries = ""
for m, par in metrics.metrics_gpu.items():
if par["query"] is metrics.QType.LAST_OF_DEVICE:
q = get_query_gpu_last(m, par["dbname"], t_start, t_end, gpus)
elif par["query"] is metrics.QType.MAX_PER_DEVICE:
q = get_query_aggr_per_device(m, par["dbname"], t_start, t_end, gpus, "max")
elif par["query"] is metrics.QType.AVG_PER_DEVICE_INT:
q = get_query_aggr_per_device_seq(m, par["dbname"], t_start, t_end, delta, gpus, "mean")
elif par["query"] is metrics.QType.MAX_SUM_PER_INT:
q = get_query_proc_aggr_per_bus_for_job(m, par["dbname"], t_start, t_end, delta, job_id, "sum", "max")
elif par["query"] is metrics.QType.PROC_SUM_PER_DEVICE_INT:
q = get_query_proc_aggr_per_bus_for_job_seq(m, par["dbname"], t_start, t_end, delta, job_id, "sum")
elif par["query"] is metrics.QType.CUSTOM:
if m == "gpu_cpu_proc_count_total":
q = get_query_proc_count_per_device_total(m, job_id)
elif m == "gpu_cpu_proc_count_sum_seq":
q = get_query_proc_count_per_device_interval_seq(m, t_start, t_end, delta, job_id)
else:
raise ValueError('Unknown query type:', par["query"])
queries += q
return queries
def format_seq_val(metric, val):
if metric not in metrics.metrics_gpu:
return val
fmtd = common.format_value(metrics.metrics_gpu[metric]["type"], val)
return fmtd
def parse_gpu_response(data, queries):
nodes = {}
queries = queries.split(";")
for i in range(0, len(data)):
if "series" not in data[i]:
if "statement_id" in data[i]:
sid = data[i]["statement_id"]
if len(queries) > sid:
if conf.DEBUG:
print("The query was not executed", queries[sid])
else:
if conf.DEBUG:
print("Some queries were not executed", data[i])
else:
raise RuntimeError("Cannot parse the output", data[i])
continue
for s in range(0, len(data[i]['series'])):
series = data[i]['series'][s]
host =series['tags']['host']
bus = series['tags']['bus']
metric = series['columns'][1]
if len(series['values']) == 1:
if host not in nodes: nodes[host] = defaultdict(dict)
nodes[host][bus][metric] = series['values'][0][1]
elif len(series['values']) > 1:
if host not in nodes: nodes[host] = defaultdict(dict)
nodes[host][bus][metric] = []
for v in range(0, len(series['values'])):
nodes[host][bus][metric].append(
format_seq_val(metric, series['values'][v][1])
)
return nodes
def format_data(gpuinfo_init):
gpuinfo = {}
for node, busses in gpuinfo_init.items():
for bus, vars in busses.items():
for name, par in metrics.metrics_gpu.items():
raw_value = vars.get(name, None)
if isinstance(raw_value, list):
gpuinfo[node][bus][name] = raw_value
else:
value = common.format_metric(metrics.metrics_gpu, name, raw_value)
if node not in gpuinfo: gpuinfo[node] = defaultdict(dict)
gpuinfo[node][bus][name] = value
return gpuinfo
def get_gpu_data(job_id, t_start, t_end, node_ids):
delta = math.ceil((t_end - t_start) / conf.SEQ_MAX_POINTS)
delta = max(delta, conf.METRIC_INTERVAL)
used_gpus = get_gpu_used_nodes(job_id)
if len(used_gpus) == 0:
return {}
query = get_gpu_queries(job_id, t_start, t_end, used_gpus, delta)
gpudata_raw = common.fetch_data(query)
gpudata_parsed = parse_gpu_response(gpudata_raw, query)
gpudata = format_data(gpudata_parsed)
return gpudata
......@@ -3,6 +3,7 @@ from .influxdb_fetchjob import get_job_data
from .influxdb_fetchproc import get_proc_data
from .influxdb_fetchnode import get_node_data
from .influxdb_fetchseq import get_seq_data
from .influxdb_fetchgpu import get_gpu_data
from ..aggrstruct import *
def fetch_all(job_id, type):
......@@ -20,6 +21,9 @@ def fetch_all(job_id, type):
if type == "pdf":
data["seq"], data["seq_delta"] = get_seq_data(job_id, t_start, t_end, node_ids)
if conf.GPU is True:
data["gpu"] = get_gpu_data(job_id, t_start, t_end, node_ids)
return data
def get_aggregator(job_id, type="text"):
......@@ -80,8 +84,6 @@ def get_aggregator(job_id, type="text"):
if new_node.beegfs_write_sum is not None:
new_node.beegfs_write_sum *= 1024
procd = data["proc"].get(node_id)
new_node.proc.cpu_time_user = procd.get("cpu_time_user")
......@@ -98,6 +100,29 @@ def get_aggregator(job_id, type="text"):
new_node.proc.mem_vms = procd.get("mem_vms")
new_node.proc.cpu_usage = procd.get("cpu_usage")
if conf.GPU is True:
gpud = data["gpu"].get(node_id, None)
if gpud is not None:
busses = []
for bus_id, busd in gpud.items():
newbus = GPUData()
newbus.bus = bus_id
newbus.power_limit = busd["gpu_power_limit"]
newbus.mem = busd["gpu_memory_total"]
newbus.mem_max = busd["gpu_memory_max"]
newbus.temp_max = busd["gpu_temperature_max"]
newbus.power_max = busd["gpu_power_max"]
newbus.usage_max = busd["gpu_utilization_max"]
newbus.cpu_usage_max = busd["gpu_cpu_usage_max"]
newbus.cpu_mem_rss_max = busd["gpu_cpu_mem_max"]
newbus.cpu_proc_total = busd["gpu_cpu_proc_count_total"]
busses.append(newbus)
new_node.gpus = busses
if type == "pdf":
new_node.proc.seq_cpu_usage = SeqVals(data["seq_delta"], data["seq"][node_id].get("proc_cpu_usage_sum"))
new_node.proc.seq_mem_rss_sum = SeqVals(data["seq_delta"], data["seq"][node_id].get("proc_mem_rss_sum"))
......@@ -127,6 +152,19 @@ def get_aggregator(job_id, type="text"):
if new_node.seq_beegfs_write_sum.seq is not None:
new_node.seq_beegfs_write_sum.seq = [None if x is None else (4 * x) for x in new_node.seq_beegfs_write_sum.seq]
if conf.GPU is True and gpud is not None:
for i in range(0, len(new_node.gpus)):
bus_id = new_node.gpus[i].bus
busd = data["gpu"][node_id][bus_id]
new_node.gpus[i].seq_mem_avg = SeqVals(data["seq_delta"], busd["gpu_memory_avg_seq"])
new_node.gpus[i].seq_usage_avg = SeqVals(data["seq_delta"], busd["gpu_utilization_avg_seq"])
new_node.gpus[i].seq_temp_avg = SeqVals(data["seq_delta"], busd["gpu_temperature_avg_seq"])
new_node.gpus[i].seq_power_avg = SeqVals(data["seq_delta"], busd["gpu_power_avg_seq"])
new_node.gpus[i].seq_cpu_usage_sum = SeqVals(data["seq_delta"], busd["gpu_cpu_usage_sum_seq"])
new_node.gpus[i].seq_cpu_mem_rss_sum = SeqVals(data["seq_delta"], busd["gpu_cpu_mem_sum_seq"])
new_node.gpus[i].seq_cpu_proc_count = SeqVals(data["seq_delta"], busd["gpu_cpu_proc_count_sum_seq"])
aggr.nodes.append(new_node)
return aggr
......@@ -10,7 +10,15 @@ class QType(Enum):
AVG_OF_NODE = 5
MAX_OF_NODE = 6
SUM_OF_NODE = 7
LAST_OF_NODE = 8
PER_DEVICE = 9
AVG_PER_DEVICE = 10
MAX_PER_DEVICE = 11
LAST_OF_DEVICE = 12
AVG_PER_DEVICE_INT = 13
MAX_PER_DEVICE_INT = 14
AVG_PER_PROC_ON_DEVICE_INT = 15
PROC_SUM_PER_DEVICE_INT = 16
class MType(Enum):
INT = 1
......@@ -242,3 +250,103 @@ metrics_seq = {
"measurement": conf.measurements["beegfs"],
},
}
metrics_gpu = {
"gpu_power_limit": {
"dbname": "power.limit",
"type": MType.INT,
"query": QType.LAST_OF_DEVICE,
"measurement": conf.measurements["gpu_node"],
},
"gpu_memory_total": {
"dbname": "memory.total",
"type": MType.INT,
"query": QType.LAST_OF_DEVICE,
"measurement": conf.measurements["gpu_node"],
},
"gpu_temperature_max": {
"dbname": "temperature.gpu",
"type": MType.INT,
"query": QType.MAX_PER_DEVICE,
"measurement": conf.measurements["gpu_node"],
},
"gpu_power_max": {
"dbname": "power.draw",
"type": MType.FLT,
"query": QType.MAX_PER_DEVICE,
"measurement": conf.measurements["gpu_node"],
},
"gpu_memory_max": {
"dbname": "memory.used",
"type": MType.INT,
"query": QType.MAX_PER_DEVICE,
"measurement": conf.measurements["gpu_node"],
},
"gpu_utilization_max": {
"dbname": "utilization.gpu",
"type": MType.INT,
"query": QType.MAX_PER_DEVICE,
"measurement": conf.measurements["gpu_node"],
},
"gpu_memory_avg_seq": {
"dbname": "memory.used",
"type": MType.FLT,
"query": QType.AVG_PER_DEVICE_INT,
"measurement": conf.measurements["gpu_node"],
},
"gpu_utilization_avg_seq": {
"dbname": "utilization.gpu",
"type": MType.FLT,
"query": QType.AVG_PER_DEVICE_INT,
"measurement": conf.measurements["gpu_node"],
},
"gpu_temperature_avg_seq": {
"dbname": "temperature.gpu",
"type": MType.INT,
"query": QType.AVG_PER_DEVICE_INT,
"measurement": conf.measurements["gpu_node"],
},
"gpu_power_avg_seq": {
"dbname": "power.draw",
"type": MType.FLT,
"query": QType.AVG_PER_DEVICE_INT,
"measurement": conf.measurements["gpu_node"],
},
"gpu_cpu_usage_max": {
"dbname": "cpu_pcpu",
"type": MType.INT,
"query": QType.MAX_SUM_PER_INT,
"measurement": conf.measurements["gpu_proc"],
},
"gpu_cpu_mem_max": {
"dbname": "cpu_rss",
"type": MType.INT,
"query": QType.MAX_SUM_PER_INT,
"measurement": conf.measurements["gpu_proc"],
},
"gpu_cpu_usage_sum_seq": {
"dbname": "cpu_pcpu",
"type": MType.INT,
"query": QType.PROC_SUM_PER_DEVICE_INT,
"measurement": conf.measurements["gpu_proc"],
},
"gpu_cpu_mem_sum_seq": {
"dbname": "cpu_rss",
"type": MType.INT,
"query": QType.PROC_SUM_PER_DEVICE_INT,
"measurement": conf.measurements["gpu_proc"],
},
"gpu_cpu_proc_count_total": {
"dbname": "pid",
"type": MType.INT,
"query": QType.CUSTOM,
"measurement": conf.measurements["gpu_proc"],
},
"gpu_cpu_proc_count_sum_seq": {
"dbname": "pid",
"type": MType.INT,
"query": QType.CUSTOM,
"measurement": conf.measurements["gpu_proc"],
},
}
......@@ -44,6 +44,22 @@ class OutputSchemes:
"ib_xmit_max": "ib_xmit_max",
"beegfs_read_sum": "beegfs_read_sum",
"beegfs_write_sum": "beegfs_write_sum",
"gpus": {
"type": "list",
"path": "gpus",
"scheme": {
"bus": "bus",
"power_limit": "power_limit",
"mem": "mem",
"mem_max": "mem_max",
"temp_max": "temp_max",
"power_max": "power_max",
"usage_max": "usage_max",
"cpu_usage_max": "cpu_usage_max",
"cpu_mem_rss_max": "cpu_mem_rss_max",
"cpu_proc_total": "cpu_proc_total",
}
}
}
}
}
......@@ -94,6 +110,59 @@ class OutputSchemes:
"ib_xmit_max": "ib_xmit_max",
"beegfs_read_sum": "beegfs_read_sum",
"beegfs_write_sum": "beegfs_write_sum",
"gpus": {
"type": "list",
"path": "gpus",
"scheme": {
"bus": "bus",
"power_limit": "power_limit",
"mem": "mem",
"mem_max": "mem_max",
"temp_max": "temp_max",
"power_max": "power_max",
"usage_max": "usage_max",
"cpu_usage_max": "cpu_usage_max",
"cpu_mem_rss_max": "cpu_mem_rss_max",
"cpu_proc_total": "cpu_proc_total",
"dynamic": {
"seq_mem_avg": {
"type": "seq",
"path": "seq_mem_avg",
"scheme": "std"
},
"seq_usage_avg": {
"type": "seq",
"path": "seq_usage_avg",
"scheme": "std"
},
"seq_temp_avg": {
"type": "seq",
"path": "seq_temp_avg",
"scheme": "std"
},