Commit 63375229 authored by Azat Khuziyakhmetov's avatar Azat Khuziyakhmetov
Browse files

Moved aggregation part to its own repo.

parent f741be0c
syntax: glob
*.pyc
*.o
*.lo
*.la
*.a
*.x
*.oo
.git
.bzr
.svn
.DS_Store
.vscode
.deps
.sonarlint
.dist
.mypy_cache
__pycache__
/conf/main.py
out
/conf/user.py
/conf/influxdb.py
htmlcov
.coverage
sonar-project.properties
.scannerwork
# aggregator # Aggregator
The tool aggregates data in a format wich is suitable for generating reports.
The project of aggregator that collects and calculates the data for report generators. *Currently also contains the script to export `jobinfo` data into DB from the post execution script*.
\ No newline at end of file
## Requirements
The aggregator needs particular software packages to be installed along with data available in DB.
- Python 3.
- InfluxDB. Currently the aggregator works only with InfluxDB, so it should be installed and configured as shown in configuration section below.
- DB should contain all required data specified in [DB cpec](https://gibraltar.chi.uni-hannover.de/profit-hpc/ProfiT-HPC/blob/d9a21af233ab373bf90420e3f0f0c05e1c65aef8/Internals/DB/InfluxDBspec.md).
## Configuration
Samples for configurations can be stored in the repository, please rename corresponding templates with an extension `.sample` to `.py` and change placeholder values accordingly.
Real configs should be ignored in `.gitignore` file.
Sample configs must not be used in the code.
**Example**: `influxdb.sample` -> `influxdb.py`
#! /usr/bin/env python3
# DEBUGGING
#############################
DEBUG = False
# INFLUXDB
#############################
# interval of measurements in seconds
METRIC_INTERVAL = 10
# names
measurements = {
"proc": 'pfit-uprocstat',
"jobs": 'pfit-jobinfo',
"node": 'pfit-nodeinfo',
}
# DB
#############################
job_info = {
"fetch_job_info_lsf": "bjobs -o \"{:s}\" {:s}",
"fetch_job_info_slurm": "sacct --format=\"{:s}\" -X -P -j {:s}",
"measurement_name": "pfit-jobinfo",
}
# Batch system
BATCH_SYSTEM = "SLURM" # LSF, SLURM
#! /usr/bin/env python3
IDB = {
"username": "root",
"password": "root",
"database": "telegraf",
"api_url": "http://127.0.0.1:8086",
"ssl": True, #True, False or Certfile Path
}
#! /usr/bin/env python3
import argparse
import json
import db.getproc
import db.getnode
import db.getjob
import db.metrics
import postexec.exportjobinfo as xjob
import conf.config as conf
def format_output(job_id, proc, job, nodes, alloc_cu):
nodenames = list(nodes.keys())
nodes_concat = []
for n in nodenames:
cc = {db.metrics.APInames.NODE_NAME: n, **proc[n], **nodes[n]}
# add allocated cu info
m = db.metrics.metrics_node["alloc_cu"]["apiname"]
if n in alloc_cu:
cc[m] = alloc_cu[n]
nodes_concat.append(cc)
o = {
"job_id": None,
"general": {},
"nodes": []
}
o["job_id"] = job_id
o["general"] = job
o["nodes"] = nodes_concat
return o
def print_job_info(job_id):
proc = db.getproc.get_proc_data(job_id)
job, alloc_cu = db.getjob.get_job_data(job_id)
nodenames = list(proc.keys())
nodes = db.getnode.get_node_data(nodenames)
if conf.DEBUG:
print("jobinfo from INFLUXDB and REMOTE: {:s}".format(json.dumps(nodes)))
print("jobinfo from BATCH: {:s}".format(json.dumps(job)))
output = format_output(job_id, proc, job, nodes, alloc_cu)
print(json.dumps(output))
def main():
parser = argparse.ArgumentParser(description="""
Gets the job information required for generating an ASCII report
and outputs it in JSON format.
Optionaly it stores the job info in DB.
""")
parser.add_argument("-s", "--save", help="save batch info of the job in DB",
action="store_true")
parser.add_argument("JOBID",
help="job ID used in the batch system")
args = parser.parse_args()
job_id = args.JOBID
if args.save:
res = xjob.export_job_info(job_id)
if res is not True:
return res
print("Job {} was exported".format(job_id))
return 0
print_job_info(job_id)
return 0
if __name__ == "__main__":
main()
# db/common.py common functions
import requests
from requests.auth import HTTPBasicAuth
from conf import influxdb as confdb
from . import metrics
def fetch_data(query):
payload = {'db': confdb.IDB["database"], 'q': query}
query_url = "{:s}/query".format(confdb.IDB["api_url"])
r = requests.get(query_url, verify=confdb.IDB["ssl"], auth=HTTPBasicAuth(
confdb.IDB["username"], confdb.IDB["password"]), params=payload)
# not only status code 200 gives us results, so we need to find out which are the bad/good status codes.
if r.status_code != 200:
raise RuntimeError(
"Couldn't retrieve the data from InfluxDB. Code:", r.status_code)
result = r.json()['results']
return result
# format metric
def format_metric(metrics_dict, metric, value):
if value is None:
return None
type = metrics_dict[metric]["type"]
if type is metrics.MType.INT:
return int(float(value))
if type is metrics.MType.STR:
return str(value)
if type is metrics.MType.FLT:
return float(value)
return None
# db/getjob.py gets job information from the file created by postexec script
# or directly from the job system
import conf.config as conf
from . import common
from . import metrics
m_jobs = conf.measurements["jobs"]
def get_job_query(job_id):
query = (
'SELECT last(*) FROM "{:s}" '
'WHERE "jobid" = \'{:s}\';'
).format(m_jobs, job_id)
return query
def parse_job_response(data=None):
job = {}
if len(data) == 0:
raise RuntimeError("Job query returned empty response")
if "series" not in data[0]:
raise RuntimeError("Job query was not executed correctly", data[0])
if len(data[0]['series']) == 0:
raise RuntimeError("Job was not found", data[0])
for i in range(0, len(data[0]['series'][0]['columns'])):
metric = data[0]['series'][0]['columns'][i]
job[metric] = data[0]['series'][0]['values'][0][i]
return job
def parse_alloc_cu(alloc_info):
nodes = {}
nodes_raw = alloc_info.split(":")
for n in nodes_raw:
cu, node_id = n.split("*")
fcu = common.format_metric(metrics.metrics_node, "alloc_cu", cu)
nodes[node_id] = fcu
return nodes
def format_job_info(jobinfo_init):
jobinfo = {}
for name, par in metrics.metrics_job.items():
raw_value = jobinfo_init.get("last_{:s}".format(par["dbname"]), None)
value = common.format_metric(metrics.metrics_job, name, raw_value)
jobinfo[par["apiname"]] = value
return jobinfo
def get_job_data(job_id):
query = get_job_query(job_id)
jobdata_raw = common.fetch_data(query)
jobdata_parsed = parse_job_response(jobdata_raw)
jobinfo = format_job_info(jobdata_parsed)
# Special case: Allocated CU
alloc_cu = parse_alloc_cu(jobinfo["alloc_cu"])
return jobinfo, alloc_cu
# db/getnode.py gets node information directly from the nodes and saves locally
# for further usage
import conf.config as conf
from collections import defaultdict
from . import common
from . import metrics
m_node = conf.measurements["node"]
def get_node_query(node_id):
random_field = metrics.metrics_node["sockets"]["dbname"]
query = (
'SELECT last({:s}), * FROM "{:s}" '
'WHERE "host" = \'{:s}\' GROUP BY "host";'
).format(random_field, m_node, node_id)
return query
def get_node_queries(nodes):
queries = ""
for n in nodes:
q = get_node_query(n)
queries += q
return queries
def parse_node_response(data=None):
nodes = defaultdict(dict)
for i in range(0, len(data)):
if "series" not in data[i]:
raise RuntimeError("Some queries were not executed", data[i])
for s in range(0, len(data[i]['series'])):
for m in range(0, len(data[i]['series'][0]['columns'])):
host = data[i]['series'][s]['tags']['host']
metric = data[i]['series'][s]['columns'][m]
value = data[i]['series'][s]['values'][0][m]
nodes[host][metric] = value
return nodes
def format_node_info(nodeinfo_init):
nodeinfo = defaultdict(dict)
for n in nodeinfo_init:
for name, par in metrics.metrics_node.items():
raw_value = nodeinfo_init[n].get(par["dbname"], None)
value = common.format_metric(metrics.metrics_node, name, raw_value)
nodeinfo[n][par["apiname"]] = value
phname = metrics.metrics_node["phys_thr_core"]["apiname"]
vrname = metrics.metrics_node["virt_thr_core"]["apiname"]
nodeinfo[n][phname] = 1
nodeinfo[n][vrname] -= 1
return nodeinfo
def get_node_data(nodes):
nodes = set(nodes)
queries = get_node_queries(nodes)
nodedata_raw = common.fetch_data(queries)
nodedata_parsed = parse_node_response(nodedata_raw)
nodeinfo = format_node_info(nodedata_parsed)
return nodeinfo
# db/getinfluxdb.py gets metrics from the InfluxDB by jobID
# and returns parsed dictionary
from collections import defaultdict
import conf.config as conf
from . import common
from . import metrics
m_proc = conf.measurements["proc"]
def get_query_sum_of_max(metric, job_id, metric_db):
aggr_outer = 'sum("{:s}_aggr") as "{:s}"'.format(metric, metric)
aggr_inner = 'max("{:s}") as "{:s}_aggr"'.format(metric_db, metric)
query = (
'SELECT {:s} FROM ('
'SELECT {:s}, "host" FROM "{:s}" '
'WHERE "jobid1" = \'{:s}\' '
'GROUP BY "pid") GROUP BY "host";'
).format(aggr_outer, aggr_inner, m_proc, job_id)
return query
def get_query_sum_per_interval(metric, job_id, aggr, metric_db):
interval = conf.METRIC_INTERVAL
aggr_outer = '{:s}("{:s}_aggr") as "{:s}"'.format(aggr, metric, metric)
aggr_inner = 'sum("{:s}") as "{:s}_aggr"'.format(metric_db, metric)
query = (
'SELECT {:s} FROM ('
'SELECT {:s} '
'FROM "{:s}" WHERE "jobid1" = \'{:s}\' '
'GROUP BY "host", time({:d}s)) GROUP BY "host";'
).format(aggr_outer, aggr_inner, m_proc, job_id, interval)
return query
def get_query_max_of_sum_per_interval(metric, job_id, metric_db):
return get_query_sum_per_interval(metric, job_id, "max", metric_db)
def get_query_avg_of_sum_per_interval(metric, job_id, metric_db):
return get_query_sum_per_interval(metric, job_id, "mean", metric_db)
def get_proc_queries(job_id):
queries = ""
for m, par in metrics.metrics_proc.items():
if par["query"] is metrics.QType.SUM_OF_MAX:
q = get_query_sum_of_max(m, job_id, par["dbname"])
elif par["query"] is metrics.QType.MAX_SUM_PER_INT:
q = get_query_max_of_sum_per_interval(m, job_id, par["dbname"])
elif par["query"] is metrics.QType.AVG_SUM_PER_INT:
q = get_query_avg_of_sum_per_interval(m, job_id, par["dbname"])
else:
raise ValueError('Unknown query type:', par["query"])
queries += q
return queries
def parse_proc_response(data=None):
nodes = defaultdict(dict)
for i in range(0, len(data)):
if "series" not in data[i]:
raise RuntimeError("Some queries were not executed", data[i])
for s in range(0, len(data[i]['series'])):
metric = data[i]['series'][s]['columns'][1]
host = data[i]['series'][s]['tags']['host']
nodes[host][metric] = data[i]['series'][s]['values'][0][1]
return nodes
def format_data(procinfo_init):
procinfo = defaultdict(dict)
for node, vars in procinfo_init.items():
for name, par in metrics.metrics_proc.items():
raw_value = vars.get(name, None)
value = common.format_metric(metrics.metrics_proc, name, raw_value)
procinfo[node][par["apiname"]] = value
return procinfo
def get_proc_data(job_id):
query = get_proc_queries(job_id)
procdata_raw = common.fetch_data(query)
procdata_parsed = parse_proc_response(procdata_raw)
procdata = format_data(procdata_parsed)
return procdata
from enum import Enum
class QType(Enum):
SUM_OF_MAX = 1
MAX_SUM_PER_INT = 2
AVG_SUM_PER_INT = 3
class MType(Enum):
INT = 1
STR = 2
FLT = 3
class APInames():
NODE_NAME = "node_name"
metrics_proc = {
'cpu_time_user': {
"apiname": 'cpu_time_user',
"dbname": 'cpu_time_user',
"query": QType.SUM_OF_MAX,
"type": MType.INT},
'cpu_time_system': {
"apiname": 'cpu_time_system',
"dbname": 'cpu_time_system',
"query": QType.SUM_OF_MAX,
"type": MType.INT},
'cpu_time_idle': {
"apiname": 'cpu_time_idle',
"dbname": 'cpu_time_idle',
"query": QType.SUM_OF_MAX,
"type": MType.INT},
'cpu_time_iowait': {
"apiname": 'cpu_time_iowait',
"dbname": 'cpu_time_iowait',
"query": QType.SUM_OF_MAX,
"type": MType.INT},
'write_bytes': {
"apiname": 'write_bytes',
"dbname": 'write_bytes',
"query": QType.SUM_OF_MAX,
"type": MType.INT},
'write_count': {
"apiname": 'write_count',
"dbname": 'write_count',
"query": QType.SUM_OF_MAX,
"type": MType.INT},
'read_bytes': {
"apiname": 'read_bytes',
"dbname": 'read_bytes',
"query": QType.SUM_OF_MAX,
"type": MType.INT},
'read_count': {
"apiname": 'read_count',
"dbname": 'read_count',
"query": QType.SUM_OF_MAX,
"type": MType.INT},
'mem_rss_max': {
"apiname": 'mem_rss_max',
"dbname": 'memory_rss',
"query": QType.MAX_SUM_PER_INT,
"type": MType.INT},
'mem_rss_avg': {
"apiname": 'mem_rss_avg',
"dbname": 'memory_rss',
"query": QType.AVG_SUM_PER_INT,
"type": MType.INT},
'mem_swap_max': {
"apiname": 'mem_swap_max',
"dbname": 'memory_swap',
"query": QType.MAX_SUM_PER_INT,
"type": MType.INT},
'mem_vms': {
"apiname": 'mem_vms',
"dbname": 'memory_vms',
"query": QType.MAX_SUM_PER_INT,
"type": MType.INT},
'cpu_usage': {
"apiname": 'cpu_usage',
"dbname": 'cpu_usage',
"query": QType.MAX_SUM_PER_INT,
"type": MType.INT},
}
metrics_job = {
"user_name": {
"apiname": "user_name",
"dbname": "user_name",
"type": MType.STR},
"used_queue": {
"apiname": "used_queue",
"dbname": "used_queue",
"type": MType.STR},
"submit_time": {
"apiname": "submit_time",
"dbname": "submit_time",
"type": MType.INT},
"start_time": {
"apiname": "start_time",
"dbname": "start_time",
"type": MType.INT},
"end_time": {
"apiname": "end_time",
"dbname": "end_time",
"type": MType.INT},
"run_time": {
"apiname": "used_time",
"dbname": "run_time",
"type": MType.INT},
"requested_time": {
"apiname": "requested_time",
"dbname": "requested_time",
"type": MType.INT},
"requested_cu": {
"apiname": "requested_cu",
"dbname": "requested_cu",
"type": MType.INT},
"num_nodes": {
"apiname": "num_nodes",
"dbname": "num_nodes",
"type": MType.INT},
"alloc_cu": {
"apiname": "alloc_cu",
"dbname": "alloc_cu",
"type": MType.STR},
}
metrics_node = {
"cpu_model": {
"apiname": "cpu_model",
"dbname": "cpu_model",
"type": MType.STR},
"sockets": {
"apiname": "sockets",
"dbname": "sockets",
"type": MType.INT},
"cores_per_socket": {
"apiname": "cores_per_socket",
"dbname": "cores_per_socket",
"type": MType.INT},
"phys_thr_core": {
"apiname": "phys_threads_per_core",
"dbname": "threads_per_core",
"type": MType.INT},
"virt_thr_core": {
"apiname": "virt_threads_per_core",
"dbname": "threads_per_core",
"type": MType.INT},
"main_mem": {
"apiname": "available_main_mem",
"dbname": "main_mem",
"type": MType.INT},
"alloc_cu": {
"apiname": "num_cores",
"dbname": "alloc_cu",
"type": MType.INT},
}
import subprocess
import csv
import re
import time
from datetime import datetime
from io import StringIO
import requests