Commit 515ede26 authored by Carsten's avatar Carsten
Browse files

debugging logging and rq.

parent 456a0d16
Pipeline #258327 failed with stages
in 6 minutes and 11 seconds
......@@ -6,12 +6,22 @@ from flask_wtf.csrf import CSRFProtect
from flask_mail import Mail
from flask_mongoengine import MongoEngine, MongoEngineSessionInterface
from redis import Redis
import datetime
import logging
import datetime
logger = logging.getLogger('rarefan')
formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(module)s: %(message)s')
timestamp = datetime.datetime.now().strftime(format="%Y%m%d-%H%M%S")
handler = logging.FileHandler("/tmp/rarefan.log".format(timestamp))
handler.setFormatter(formatter)
handler.setLevel(logging.DEBUG)
logger.addHandler(handler)
app = Flask(__name__, instance_relative_config=True, static_url_path='/static')
upload_dir = os.path.join(app.static_folder, 'uploads')
app.testing = app.debug = False
app.testing = app.debug = True
# email
app.config.from_object(Config)
......
......@@ -21,9 +21,12 @@ from app.tasks.rarefan import rarefan_task
from app.tasks.tree import tree_task, empty_task
from app.tasks.zip import zip_task
from app.tasks.email import email_task
from app.tasks.redis_tests import example
from app.callbacks.callbacks import on_success, on_failure
import logging
import datetime
import copy
import os
import shlex
......@@ -31,19 +34,9 @@ import shutil
import stat
import subprocess
import tempfile
import logging
from Bio import SeqIO
import datetime
logger = logging.getLogger(__name__)
formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(module)s: %(message)s')
timestamp = datetime.datetime.now().strftime(format="%Y%m%d-%H%M%S")
handler = logging.FileHandler("/tmp/rarefan.log".format(timestamp))
handler.setFormatter(formatter)
handler.setLevel(logging.DEBUG)
logger.addHandler(handler)
logger = logging.getLogger('rarefan')
def get_status_code(run_id_path):
# Check if the run has finished.
......@@ -62,13 +55,13 @@ def validate_fasta(filename):
:param filename: The filename of the file to validate.
"""
logging.info("Validating fasta file %s.", filename)
logger.info("Validating fasta file %s.", filename)
with open(filename, 'r') as fp:
fasta = SeqIO.parse(fp, "fasta")
is_fasta = any(fasta)
if not is_fasta:
logging.warning("%s is not a valid fasta file.", filename)
logger.warning("%s is not a valid fasta file.", filename)
return is_fasta
......@@ -76,10 +69,9 @@ def validate_fasta(filename):
def index():
return render_template("index.html")
@app.route('/upload', methods=['GET', 'POST'])
def upload():
logging.debug("upload/%s", request.method)
logger.debug("upload/%s", request.method)
if request.method == 'POST': #upload_form.validate_on_submit():
session['tmpdir'] = tempfile.mkdtemp(
suffix=None,
......@@ -88,7 +80,7 @@ def upload():
)
seqs = [v for k,v in request.files.items() if k.startswith('file')]
logging.info("Uploading %s.", str(seqs))
logger.info("Uploading %s.", str(seqs))
dna_extensions = ['fn', 'fna', 'fastn', 'fas', 'fasta']
aa_extensions = ['fa', 'faa']
......@@ -116,7 +108,7 @@ def upload():
session['tree_names'] = tree_names
for k,v in session.items():
logging.debug("session[%s] = %s", k, str(v))
logger.debug("session[%s] = %s", k, str(v))
return redirect(url_for('submit', _method='GET'))
......@@ -147,7 +139,7 @@ def upload():
@app.route('/submit', methods=['GET', 'POST'])
def submit():
logging.debug("submit/%s", request.method)
logger.debug("submit/%s", request.method)
submit_form = SubmitForm()
strain_names = session.get('strain_names')
......@@ -159,13 +151,13 @@ def submit():
tmpdir = session['tmpdir']
session['outdir'] = os.path.join(tmpdir, 'out')
logging.debug("tmpdir: %s", tmpdir)
logging.debug("tmpdir: %s", session['tmpdir'])
logger.debug("tmpdir: %s", tmpdir)
logger.debug("tmpdir: %s", session['tmpdir'])
# If there is already an outdir, this must be a rerun!
old_run_id = None
if os.path.isdir(session['outdir']):
logging.warning("Rerun, creating new directory")
logger.warning("Rerun, creating new directory")
rerun_tmpdir = tempfile.mkdtemp(
suffix=None,
prefix="",
......@@ -195,12 +187,12 @@ def submit():
session['analyse_repins'] = request.form.get('analyse_repins')
session['email'] = request.form.get('email', None)
logging.info("Session parameters:")
logging.info("tmpdir: %s", session['tmpdir'])
logging.info("outdir: %s", session['outdir'])
logging.info("reference_strain: %s", session['reference_strain'])
logging.info("treefile: %s", session['treefile'])
logging.info("email: %s", session['email'])
logger.info("Session parameters:")
logger.info("tmpdir: %s", session['tmpdir'])
logger.info("outdir: %s", session['outdir'])
logger.info("reference_strain: %s", session['reference_strain'])
logger.info("treefile: %s", session['treefile'])
logger.info("email: %s", session['email'])
# Store session in db.
run_id = os.path.basename(session['tmpdir'])
......@@ -232,7 +224,11 @@ def submit():
notification_is_sent=False,
parent_run=old_run_id
)
dbjob.save()
logger.debug("Constructed dbjob with job ID %s.", dbjob.run_id)
logger.debug("Attempting to save dbjob in DB.")
success = dbjob.save()
logger.debug("Return code is %s", str(success))
# If one of the server provided rayt files was selected, copy it to the working dir. In the dropdown menu,
# the server provided rayts are listed without filename extension, so have to append that here.
......@@ -240,7 +236,7 @@ def submit():
if session['query_rayt'] in ['yafM_Ecoli', 'yafM_SBW25']:
query_rayt_fname = query_rayt_fname + ".faa"
src = os.path.join(app.static_folder, "rayts", session['query_rayt'] + ".faa")
logging.debug("Copying rayt from %s to %s.", src, query_rayt_fname)
logger.debug("Copying rayt from %s to %s.", src, query_rayt_fname)
shutil.copyfile(src, query_rayt_fname)
if not os.path.isfile(query_rayt_fname):
......@@ -266,6 +262,8 @@ def submit():
}
)
logger.debug("Constructed rarefan job %s.", str(rarefan_job))
run_tree_task = len(dbjob.setup['strain_names']) >= 4
if run_tree_task:
......@@ -290,6 +288,7 @@ def submit():
connection=app.redis,
)
logger.debug("Constructed tree job %s.", str(tree_job))
zip_job = RQJob.create(zip_task,
depends_on=[rarefan_job, tree_job],
on_success=on_success,
......@@ -298,6 +297,7 @@ def submit():
connection=app.redis,
kwargs={'run_dir':session['tmpdir']},
)
logger.debug("Constructed zip job %s.", str(zip_job))
email_job = RQJob.create(email_task,
depends_on=['rarefan_job',
......@@ -309,6 +309,8 @@ def submit():
kwargs={'run_id': run_id},
)
logger.debug("Constructed email job %s.", str(email_job))
# Enqueue the jobs
app.queue.enqueue_job(rarefan_job)
app.queue.enqueue_job(tree_job)
......@@ -347,7 +349,7 @@ def results():
run_id = request.form.get('run_id')
if run_id is not None:
logging.debug(run_id)
logger.debug(run_id)
dbjob = DBJob.objects.get_or_404(run_id=run_id)
......@@ -401,7 +403,7 @@ def files(req_path):
# Remove trailing '/'
if req_path.endswith('/'):
req_path = req_path[:-1]
logging.warning("Request dir is %s in (%s).", req_path, os.path.dirname(req_path))
logger.warning("Request dir is %s in (%s).", req_path, os.path.dirname(req_path))
# Save the target for the 'back to results' link.
tmp_dir = session.get('tmpdir', None)
......@@ -451,8 +453,8 @@ def rerun():
run_id = request.args['run_id']
do_repins = request.args.get('do_repins', None)
dbjob = DBJob.objects.get_or_404(run_id=run_id)
logging.debug("Found job %s", str(dbjob.id))
logging.debug("Job run_id = %s", str(dbjob.run_id))
logger.debug("Found job %s", str(dbjob.id))
logger.debug("Job run_id = %s", str(dbjob.run_id))
submit_form = SubmitForm()
......@@ -484,8 +486,8 @@ def rerun():
session['email'] = dbjob.setup.get('email')
submit_form.email.data = dbjob.setup.get('email')
logging.debug("DO_REPINS? %s", do_repins)
logging.debug("analyse_repins= %s", submit_form.analyse_repins.data)
logger.debug("DO_REPINS? %s", do_repins)
logger.debug("analyse_repins= %s", submit_form.analyse_repins.data)
# Update do_repins if requested.
if do_repins is not None:
if do_repins in ['y', '1', 1, True]:
......@@ -493,7 +495,7 @@ def rerun():
else:
submit_form.analyse_repins.data = session['analyse_repins'] = None
logging.debug('session = %s', str(session))
logger.debug('session = %s', str(session))
return render_template(
'submit.html',
......@@ -511,3 +513,12 @@ def plot():
# return redirect('http://localhost:7238?run_id={}'.format(run_id))
return redirect('http://rarefan.evolbio.mpg.de/shiny/analysis?run_id={}'.format(run_id))
@app.route('/test_task')
def test_task():
job = app.queue.enqueue(example, 10)
logger.info(job.result)
return redirect(url_for('index'))
......@@ -4,10 +4,11 @@ module rarefan: Implementation of the function that runs the rarefan java code a
import os, sys, shutil, shlex
import subprocess
import logging
from app.models import Job as DBJob
from rq import get_current_job
import logging
logging.getLogger('rarefan')
from app.utilities.rarefan_cli import rarefan_command
......
import time
import logging
logging.basicConfig(level=logging.DEBUG)
import logging
logger = logging.getLogger()
def example(seconds):
logging.info("************ Starting task *****************")
seconds=int(seconds)
for i in range(seconds):
logging.debug(i)
logger.debug(i)
time.sleep(1)
logging.info("************ Task complete *****************")
logging.debug("Done")
# example(10)
......@@ -7,7 +7,7 @@ from app.models import Job as DBJob
from rq import get_current_job
import logging
logging.basicConfig(level=logging.DEBUG)
logging.getLogger('rarefan')
def tree_task(run_dir, treefile=None):
""" Generate a phylogenetic tree from all DNA sequence files in given directory.
......
......@@ -6,6 +6,8 @@ import shutil
from app.models import Job as DBJob
from rq import get_current_job
import logging
logging.getLogger('rarefan')
def zip_task(run_dir):
......
from . import checkers
from . import rarefan_cli
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment