influxdb_wrapper.py 2.84 KB
Newer Older
1
2
3
from .influxdb_fetchjob import get_job_data
from .influxdb_fetchproc import get_proc_data
from .influxdb_fetchnode import get_node_data
4
from .influxdb_fetchseq import get_seq_data
5
from ..aggrstruct import *
6
7
8
9
10
11
12
13
14
15
16

def fetch_all(job_id, type):
    data = {}
    data["job"], data["alloc_cu"] = get_job_data(job_id)
    data["proc"] = get_proc_data(job_id)

    node_ids = list(data["proc"].keys())

    data["node"] = get_node_data(node_ids)

    if type == "pdf":
17
        t_start = data["job"]["start_time"]
18
        t_end = t_start + data["job"]["run_time"]
19
        data["seq"], data["seq_delta"] = get_seq_data(job_id, t_start, t_end)
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

    return data

def get_aggregator(job_id, type="ascii"):
    data = fetch_all(job_id, type)

    aggr = Aggregator()

    jd = data["job"]
    cud = data["alloc_cu"]

    aggr.job.id = job_id
    aggr.job.user_name = jd["user_name"]
    aggr.job.used_queue = jd["used_queue"]
    aggr.job.submit_time = jd["submit_time"]
    aggr.job.start_time = jd["start_time"]
36
    aggr.job.end_time = jd["start_time"] + jd["run_time"]
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
    aggr.job.run_time = jd["run_time"]
    aggr.job.requested_time = jd["requested_time"]
    aggr.job.requested_cu = jd["requested_cu"]
    aggr.job.num_nodes = jd["num_nodes"]

    aggr.job.alloc_cu = []
    for node_id, cu_count in cud.items():
        aggr.job.alloc_cu.append(AllocCUData(node_id, cu_count))

    aggr.nodes = []
    for node_id, noded in data["node"].items():
        new_node = NodeData()

        new_node.name = node_id
        new_node.cpu_model = noded["cpu_model"]
        new_node.sockets = noded["sockets"]
        new_node.cores_per_socket = noded["cores_per_socket"]
        new_node.virt_thr_core = noded["virt_thr_core"]
        new_node.phys_thr_core = noded["phys_thr_core"]
        new_node.main_mem = noded["main_mem"]

        procd = data["proc"][node_id]

        new_node.proc.cpu_time_user = procd["cpu_time_user"]
        new_node.proc.cpu_time_system = procd["cpu_time_system"]
        new_node.proc.cpu_time_idle = procd["cpu_time_idle"]
        new_node.proc.cpu_time_iowait = procd["cpu_time_iowait"]
        new_node.proc.write_bytes = procd["write_bytes"]
        new_node.proc.write_count = procd["write_count"]
        new_node.proc.read_bytes = procd["read_bytes"]
        new_node.proc.read_count = procd["read_count"]
        new_node.proc.mem_rss_max = procd["mem_rss_max"]
        new_node.proc.mem_rss_avg = procd["mem_rss_avg"]
        new_node.proc.mem_swap_max = procd["mem_swap_max"]
        new_node.proc.mem_vms = procd["mem_vms"]
        new_node.proc.cpu_usage = procd["cpu_usage"]

Azat Khuziyakhmetov's avatar
Azat Khuziyakhmetov committed
74
75
76
        if type == "pdf":
            new_node.proc.seq_cpu_usage = SeqVals(data["seq_delta"], data["seq"][node_id]["proc_cpu_usage_sum"])
            new_node.proc.seq_mem_rss_sum = SeqVals(data["seq_delta"], data["seq"][node_id]["proc_mem_rss_sum"])
77

78
79
80
        aggr.nodes.append(new_node)

    return aggr