influxdb_wrapper.py 9.06 KB
Newer Older
1
import conf.config as conf
2
3
4
from .influxdb_fetchjob import get_job_data
from .influxdb_fetchproc import get_proc_data
from .influxdb_fetchnode import get_node_data
5
from .influxdb_fetchseq import get_seq_data
6
from .influxdb_fetchgpu import get_gpu_data
7
from ..aggrstruct import *
8
from . import common
9
10
11

def fetch_all(job_id, type):
    data = {}
12
    data["job"], data["alloc_cu"], data["alloc_mem"] = get_job_data(job_id)
13
14
15
    t_start = data["job"]["start_time"]
    t_end = t_start + data["job"]["run_time"]

16
17
18
19
20
21
    if data["job"]["run_time"] < conf.MIN_DUR * 60:
        common.eprint("The runtime of the job ({dur:d} seconds) is too short. Minimum is {min:d} seconds".format(
            dur = int(data["job"]["run_time"]),
            min = int(conf.MIN_DUR * 60),
        ))

22
23
24
25
    data["proc"] = get_proc_data(job_id)

    node_ids = list(data["proc"].keys())

26
    data["node"] = get_node_data(node_ids, t_start, t_end)
27
28

    if type == "pdf":
29
        data["seq"], data["seq_delta"] = get_seq_data(job_id, t_start, t_end, node_ids)
30

31
32
33
    if conf.GPU is True:
        data["gpu"] = get_gpu_data(job_id, t_start, t_end, node_ids)

34
35
    return data

Azat Khuziyakhmetov's avatar
Azat Khuziyakhmetov committed
36
def get_aggregator(job_id, type="text"):
37
38
39
40
41
42
    data = fetch_all(job_id, type)

    aggr = Aggregator()

    jd = data["job"]
    cud = data["alloc_cu"]
43
    memd = data["alloc_mem"]
44
45

    aggr.job.id = job_id
Azat Khuziyakhmetov's avatar
Azat Khuziyakhmetov committed
46
47
48
49
50
51
52
53
54
    aggr.job.user_name = jd.get("user_name")
    aggr.job.used_queue = jd.get("used_queue")
    aggr.job.submit_time = jd.get("submit_time")
    aggr.job.start_time = jd.get("start_time")
    aggr.job.end_time = jd.get("start_time") + jd.get("run_time")
    aggr.job.run_time = jd.get("run_time")
    aggr.job.requested_time = jd.get("requested_time")
    aggr.job.requested_cu = jd.get("requested_cu")
    aggr.job.num_nodes = jd.get("num_nodes")
55
    aggr.job.exit_code = jd.get("exit_code")
56

57
    allocated_nodes = []
58
59
    aggr.job.alloc_cu = []
    for node_id, cu_count in cud.items():
60
        allocated_nodes.append(node_id)
61
62
63
        aggr.job.alloc_cu.append(AllocCUData(node_id, cu_count))

    aggr.nodes = []
Azat Khuziyakhmetov's avatar
Azat Khuziyakhmetov committed
64
    for node_id in sorted(data["node"]):
65
66
        if node_id not in allocated_nodes:
            continue
67

Azat Khuziyakhmetov's avatar
Azat Khuziyakhmetov committed
68
        noded = data["node"][node_id]
69
70
71
        new_node = NodeData()

        new_node.name = node_id
Azat Khuziyakhmetov's avatar
Azat Khuziyakhmetov committed
72
73
74
75
76
77
78
        new_node.cpu_model = noded.get("cpu_model")
        new_node.sockets = noded.get("sockets")
        new_node.cores_per_socket = noded.get("cores_per_socket")
        new_node.virt_thr_core = noded.get("virt_thr_core")
        new_node.phys_thr_core = noded.get("phys_thr_core")
        new_node.main_mem = noded.get("main_mem")
        new_node.alloc_cu = cud.get(node_id)
79
80
81
82
83
        amem = memd.get(node_id)
        if amem > 0:
            new_node.alloc_mem = memd.get(node_id)
        else:
            ncpus = noded.get("sockets") * noded.get("cores_per_socket") * noded.get("virt_thr_core")
84
85
86
            nmem = int(cud.get(node_id) * noded.get("main_mem") / ncpus / 1000000) #in MB
            new_node.alloc_mem = nmem

Azat Khuziyakhmetov's avatar
Azat Khuziyakhmetov committed
87
88
89
90
        # for now we just account hyperthreading
        new_node.phys_thr_core = 1
        new_node.virt_thr_core -= 1

91
        if conf.INFINIBAND is True:
Azat Khuziyakhmetov's avatar
Azat Khuziyakhmetov committed
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
            new_node.ib_rcv_max = noded.get("ib_rcv_max")
            new_node.ib_xmit_max = noded.get("ib_xmit_max")
            # Multiply infiniband metrics by 4 if exist. To get bytes
            if new_node.ib_rcv_max is not None:
                new_node.ib_rcv_max *= 4
            if new_node.ib_xmit_max is not None:
                new_node.ib_xmit_max *= 4

        if conf.BEEGFS is True:
            new_node.beegfs_read_sum = noded.get("beegfs_read_sum")
            new_node.beegfs_write_sum = noded.get("beegfs_write_sum")
            # Multiply beegfs metrics by 1024 if exist. To get bytes
            if new_node.beegfs_read_sum is not None:
                new_node.beegfs_read_sum *= 1024
            if new_node.beegfs_write_sum is not None:
                new_node.beegfs_write_sum *= 1024

        procd = data["proc"].get(node_id)

        new_node.proc.cpu_time_user = procd.get("cpu_time_user")
        new_node.proc.cpu_time_system = procd.get("cpu_time_system")
        new_node.proc.cpu_time_idle = procd.get("cpu_time_idle")
        new_node.proc.cpu_time_iowait = procd.get("cpu_time_iowait")
        new_node.proc.write_bytes = procd.get("write_bytes")
        new_node.proc.write_count = procd.get("write_count")
        new_node.proc.read_bytes = procd.get("read_bytes")
        new_node.proc.read_count = procd.get("read_count")
        new_node.proc.mem_rss_max = procd.get("mem_rss_max")
        new_node.proc.mem_rss_avg = procd.get("mem_rss_avg")
        new_node.proc.mem_swap_max = procd.get("mem_swap_max")
        new_node.proc.mem_vms = procd.get("mem_vms")
        new_node.proc.cpu_usage = procd.get("cpu_usage")
124

125
        new_node.gpus = []
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
        if conf.GPU is True:
            gpud = data["gpu"].get(node_id, None)

            if gpud is not None:
                busses = []
                for bus_id, busd in gpud.items():
                    newbus = GPUData()

                    newbus.bus = bus_id
                    newbus.power_limit = busd["gpu_power_limit"]
                    newbus.mem = busd["gpu_memory_total"]
                    newbus.mem_max = busd["gpu_memory_max"]
                    newbus.temp_max = busd["gpu_temperature_max"]
                    newbus.power_max = busd["gpu_power_max"]
                    newbus.usage_max = busd["gpu_utilization_max"]
141
                    newbus.usage_avg = busd["gpu_utilization_avg"]
142
143
144
145
146
147
148
149
                    newbus.cpu_usage_max = busd["gpu_cpu_usage_max"]
                    newbus.cpu_mem_rss_max = busd["gpu_cpu_mem_max"]
                    newbus.cpu_proc_total = busd["gpu_cpu_proc_count_total"]

                    busses.append(newbus)

                new_node.gpus = busses

Azat Khuziyakhmetov's avatar
Azat Khuziyakhmetov committed
150
        if type == "pdf":
Azat Khuziyakhmetov's avatar
Azat Khuziyakhmetov committed
151
152
153
154
155
            new_node.proc.seq_cpu_usage = SeqVals(data["seq_delta"], data["seq"][node_id].get("proc_cpu_usage_sum"))
            new_node.proc.seq_mem_rss_sum = SeqVals(data["seq_delta"], data["seq"][node_id].get("proc_mem_rss_sum"))
            new_node.proc.seq_write_sum = SeqVals(data["seq_delta"], data["seq"][node_id].get("proc_write_sum"))
            new_node.proc.seq_read_sum = SeqVals(data["seq_delta"], data["seq"][node_id].get("proc_read_sum"))
            new_node.seq_cpu_usage = SeqVals(data["seq_delta"], data["seq"][node_id].get("node_cpu_idle_avg"))
156
157
            new_node.seq_cpu_usage.seq = [None if x is None else (100 - x) for x in new_node.seq_cpu_usage.seq]

Azat Khuziyakhmetov's avatar
Azat Khuziyakhmetov committed
158
159
            new_node.seq_load_avg = SeqVals(data["seq_delta"], data["seq"][node_id].get("node_load_avg"))
            new_node.seq_load_max = SeqVals(data["seq_delta"], data["seq"][node_id].get("node_load_max"))
160

161
            if conf.INFINIBAND is True:
Azat Khuziyakhmetov's avatar
Azat Khuziyakhmetov committed
162
163
164
165
166
167
168
169
170
171
172
                new_node.seq_ib_rcv_max = SeqVals(data["seq_delta"], data["seq"][node_id].get("node_ib_rcv_max"))
                new_node.seq_ib_xmit_max = SeqVals(data["seq_delta"], data["seq"][node_id].get("node_ib_xmit_max"))
                # Multiply values by 4 in order to get bytes
                if new_node.seq_ib_rcv_max.seq is not None:
                    new_node.seq_ib_rcv_max.seq = [None if x is None else (4 * x) for x in new_node.seq_ib_rcv_max.seq]
                if new_node.seq_ib_xmit_max.seq is not None:
                    new_node.seq_ib_xmit_max.seq = [None if x is None else (4 * x) for x in new_node.seq_ib_xmit_max.seq]

            if conf.BEEGFS is True:
                new_node.seq_beegfs_read_sum = SeqVals(data["seq_delta"], data["seq"][node_id].get("node_beegfs_read_sum"))
                new_node.seq_beegfs_write_sum = SeqVals(data["seq_delta"], data["seq"][node_id].get("node_beegfs_write_sum"))
173
                # Multiply values by 4 in order to get bytes
Azat Khuziyakhmetov's avatar
Azat Khuziyakhmetov committed
174
175
176
177
                if new_node.seq_beegfs_read_sum.seq is not None:
                    new_node.seq_beegfs_read_sum.seq = [None if x is None else (4 * x) for x in new_node.seq_beegfs_read_sum.seq]
                if new_node.seq_beegfs_write_sum.seq is not None:
                    new_node.seq_beegfs_write_sum.seq = [None if x is None else (4 * x) for x in new_node.seq_beegfs_write_sum.seq]
178

179
180
181
182
183
184
185
186
187
188
189
190
191
            if conf.GPU is True and gpud is not None:
                for i in range(0, len(new_node.gpus)):
                    bus_id = new_node.gpus[i].bus
                    busd = data["gpu"][node_id][bus_id]

                    new_node.gpus[i].seq_mem_avg = SeqVals(data["seq_delta"], busd["gpu_memory_avg_seq"])
                    new_node.gpus[i].seq_usage_avg = SeqVals(data["seq_delta"], busd["gpu_utilization_avg_seq"])
                    new_node.gpus[i].seq_temp_avg = SeqVals(data["seq_delta"], busd["gpu_temperature_avg_seq"])
                    new_node.gpus[i].seq_power_avg = SeqVals(data["seq_delta"], busd["gpu_power_avg_seq"])
                    new_node.gpus[i].seq_cpu_usage_sum = SeqVals(data["seq_delta"], busd["gpu_cpu_usage_sum_seq"])
                    new_node.gpus[i].seq_cpu_mem_rss_sum = SeqVals(data["seq_delta"], busd["gpu_cpu_mem_sum_seq"])
                    new_node.gpus[i].seq_cpu_proc_count = SeqVals(data["seq_delta"], busd["gpu_cpu_proc_count_sum_seq"])

192
193
194
        aggr.nodes.append(new_node)

    return aggr