data.py 6.5 KB
Newer Older
1
2
3
#! /usr/bin/env python3

import json
4
5
import argparse
import db.aggregator as aggregator
6
import conf.config as conf
7
8
9
import subprocess
import re
import os
Azat Khuziyakhmetov's avatar
Azat Khuziyakhmetov committed
10
11
import gzip
import io
12
from format import format
13
from rcm import rcm
14

Azat Khuziyakhmetov's avatar
Azat Khuziyakhmetov committed
15
16
17
def cache_file(jobid, filename, content):
    if conf.CACHEJSON is False or conf.CACHEDIR == "":
        return False
Azat Khuziyakhmetov's avatar
Azat Khuziyakhmetov committed
18

Azat Khuziyakhmetov's avatar
Azat Khuziyakhmetov committed
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
    if not os.path.exists(conf.CACHEDIR):
        os.makedirs(conf.CACHEDIR, mode=0o770)

    cachefile = conf.CACHEDIR + "/" + filename

    fd = os.open(cachefile, os.O_CREAT | os.O_WRONLY, 0o660)
    f = gzip.GzipFile(fileobj=io.FileIO(fd, mode='wb'))
    f.write(content.encode())
    f.close()

    return True

def cache_data(jobid, type, content):
    if conf.CACHEJSON is False or conf.CACHEDIR == "":
        return False

    filename = ""
    if type == "text":
        filename = "{:s}.txt.json.gz".format(jobid)
    elif type == "pdf":
        filename = "{:s}.pdf.json.gz".format(jobid)
    else:
        raise RuntimeError("invalid type of report: {}".format(type))

    cache_file(jobid, filename, content)


def merge_and_out(job_id, aggr, rcm, type, cache, out_dir=None):
    if cache[type] is False:
        formatted = format.format(aggr, type)
        formatted["recommendations"] = rcm
        jsonstr = json.dumps(formatted)

        if conf.CACHEJSON is True:
            cache_data(job_id, type, jsonstr)
    else:
        jsonstr = cache[type]
Azat Khuziyakhmetov's avatar
Azat Khuziyakhmetov committed
56
57

    if out_dir is None:
Azat Khuziyakhmetov's avatar
Azat Khuziyakhmetov committed
58
        print(jsonstr)
Azat Khuziyakhmetov's avatar
Azat Khuziyakhmetov committed
59
60
61
    else:
        filename = "{:s}/{:s}.{:s}.json".format(out_dir, job_id, type)
        with open(filename, 'w') as outfile:
Azat Khuziyakhmetov's avatar
Azat Khuziyakhmetov committed
62
            outfile.write(jsonstr)
Azat Khuziyakhmetov's avatar
Azat Khuziyakhmetov committed
63
64
65

        print("{:s} data was exported in {:s}".format(type, filename))

66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def check_user(jobid):
    euid = os.geteuid()
    uid = os.getuid()

    if euid == 0 or uid == 0:
        return

    if conf.BATCH_SYSTEM == "SLURM":
        job_uid_comm = conf.job_uid_comm["slurm"].format(jobid = jobid)
    elif conf.BATCH_SYSTEM == "LSF":
        job_uid_comm = conf.job_uid_comm["lsf"].format(jobid = jobid)
    else:
        print("Cannot check the user ID, no batch system specified")
        exit(1)

    result = subprocess.run(job_uid_comm, stdout=subprocess.PIPE, shell=True,
                            executable='/bin/bash')
    out = result.stdout.decode("utf-8")
    m = re.search("[0-9]+", out)

    if m is None or m.group(0) is None:
        print("Cannot parse UID. {:s}".format(out))
        exit(1)

    job_uid = m.group(0)

    if int(job_uid) != int(uid):
        print("Access denied. UIDs of the user and the job do not match.")
        exit(1)

    return

98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def job_id_transform(jobid):
    if "_" not in jobid:
        return jobid

    if conf.BATCH_SYSTEM == "SLURM":
        jobid_comm = conf.jobid_array_trans_comm["slurm"].format(jobid = jobid)
    elif conf.BATCH_SYSTEM == "LSF":
        return jobid
    else:
        print("Cannot transform JOBID, no batch system specified")
        exit(1)

    result = subprocess.run(jobid_comm, stdout=subprocess.PIPE, shell=True,
                            executable='/bin/bash')
    out = result.stdout.decode("utf-8")
    m = re.search("[0-9]+", out)

    if m is None or m.group(0) is None:
        print("Cannot parse JOBID. {:s}".format(out))
        exit(1)

    raw_job_id = m.group(0)

    return raw_job_id

Azat Khuziyakhmetov's avatar
Azat Khuziyakhmetov committed
123
124
125
126
127
128
129
130
131
132
133
def check_cache_file(jobid, filename):
    if conf.CACHEJSON is False or conf.CACHEDIR == "":
        return False

    if not os.path.exists(conf.CACHEDIR):
        os.makedirs(conf.CACHEDIR, mode=0o770)

    cachefile = conf.CACHEDIR + "/" + filename

    res = False
    if os.path.exists(cachefile):
134
        f = gzip.open(cachefile, "rt")
Azat Khuziyakhmetov's avatar
Azat Khuziyakhmetov committed
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
        content = f.read()
        if content != "init" and content != "":
            res = content
    else:
        fd = os.open(cachefile, os.O_CREAT | os.O_WRONLY, 0o660)
        f = gzip.GzipFile(fileobj=io.FileIO(fd, mode='wb'))
        f.write("init".encode())
        f.close()

    return res

def check_cache_txt(jobid):
    txtfilename = "{:s}.txt.json.gz".format(jobid)
    return check_cache_file(jobid, txtfilename)

def check_cache_pdf(jobid):
    txtfilename = "{:s}.pdf.json.gz".format(jobid)
    return check_cache_file(jobid, txtfilename)

154
155
def main():
    parser = argparse.ArgumentParser(description="""
Azat Khuziyakhmetov's avatar
Azat Khuziyakhmetov committed
156
        Gets the job information required for generating text or PDF reports
157
        and outputs it in JSON format.
158
    """, formatter_class=argparse.ArgumentDefaultsHelpFormatter)
159

160
    parser.add_argument("-t", "--type", help="type of the output",
Azat Khuziyakhmetov's avatar
Azat Khuziyakhmetov committed
161
162
163
164
                        choices=['text', 'pdf', 'all'], default="text")

    parser.add_argument("-o", "--output-dir", help="output directory",
                        const="./out", required=False, nargs='?')
165

166
167
168
169
    parser.add_argument("JOBID",
                        help="job ID used in the batch system")

    args = parser.parse_args()
170
171
172
173
174
175

    job_id = args.JOBID

    if conf.SECUSER:
        check_user(job_id)

Azat Khuziyakhmetov's avatar
Azat Khuziyakhmetov committed
176
177
178
179
180
181
    # Errors in arguments
    if args.output_dir is None:
        if args.type == "all":
            print("Cannot print both data in STDOUT")
            exit()
    # End of errors in arguments
182

183
184
185
    if conf.JOBID_ARRAY_TRANSFORM:
        job_id = job_id_transform(job_id)

Azat Khuziyakhmetov's avatar
Azat Khuziyakhmetov committed
186
187
188
189
190
    # initial values
    need_to_aggregate = True
    cache = {"text": False, "pdf": False}
    aggr = None
    recommendations = None
191

Azat Khuziyakhmetov's avatar
Azat Khuziyakhmetov committed
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
    # Check if we can use cached data
    if conf.CACHEJSON is True:
        if args.type == "all":
            cache["text"] = check_cache_txt(job_id)
            cache["pdf"] = check_cache_pdf(job_id)
            need_to_aggregate = (cache["text"] is False) or (cache["pdf"] is False)
        elif args.type == "text":
            cache["text"] = check_cache_txt(job_id)
            need_to_aggregate = cache["text"] is False
        elif args.type == "pdf":
            cache["pdf"] = check_cache_pdf(job_id)
            need_to_aggregate = cache["pdf"] is False

    if need_to_aggregate is True:
        aggr = aggregator.get_aggregator(job_id, "pdf")
207
208
209
210
        try:
            recommendations = rcm.get_recommendations(aggr)
        except:
            recommendations = ["Error occured during recommendations generation"]
211

Azat Khuziyakhmetov's avatar
Azat Khuziyakhmetov committed
212
    if args.output_dir is None:
Azat Khuziyakhmetov's avatar
Azat Khuziyakhmetov committed
213
        merge_and_out(job_id, aggr, recommendations, args.type, cache)
Azat Khuziyakhmetov's avatar
Azat Khuziyakhmetov committed
214
215
    else:
        if args.type == "all":
Azat Khuziyakhmetov's avatar
Azat Khuziyakhmetov committed
216
217
            merge_and_out(job_id, aggr, recommendations, "text", cache, args.output_dir)
            merge_and_out(job_id, aggr, recommendations, "pdf", cache, args.output_dir)
Azat Khuziyakhmetov's avatar
Azat Khuziyakhmetov committed
218
        else:
Azat Khuziyakhmetov's avatar
Azat Khuziyakhmetov committed
219
            merge_and_out(job_id, aggr, recommendations, args.type, cache, args.output_dir)
220
221
222
223
224

    return 0

if __name__ == "__main__":
    main()