Commit cf33d59c authored by cnguyen2's avatar cnguyen2
Browse files

Add annoutil.py

parent 66f5d856
#!/usr/bin/env python3
'''
TODO
'''
import argparse
import contextlib
import os.path
import logging
import gzip
import sys
import json
import requests
from collections import defaultdict
def main():
args = getArgs()
if not args.with_anno:
if args.with_segment:
pairs = parseTsv(args.with_segment)
else:
genes = []
for gene in args.gene:
if os.path.isfile(gene):
genes.extend(parseList(gene))
else:
genes.append(gene)
gene_altering_segments = (getGeneAlteringSegment(g) for g in genes)
pairs = (createGeneSegmentPair(gene, seg) for gene, segs in zip(genes, gene_altering_segments) for seg in segs)
if args.only_segment:
saveTsv(pairs, args.output, 'gene', 'segment')
return
def genAnnotatedPair():
if args.with_anno:
yield from parseJson(args.with_anno)
else:
for pair in pairs:
anns = getAnnotations(pair['segment'])
keys = ['description', 'penetranceType', 'penetranceValue']
pair['annotations'] = [{k: a[k] for k in keys} for a in anns]
if pair['annotations']:
yield pair
def genFilteredAnnotatedPair():
filter_set = set(parseList(args.filter_list)) if args.filter_list else set()
for pair in genAnnotatedPair():
pair['annotations'] = [a for a in pair['annotations'] if filterAnnotation(a, args.absolute_threshold, args.percentage_threshold, filter_set)]
if pair['annotations']:
yield pair
annotated_pairs = genFilteredAnnotatedPair() if args.filter else genAnnotatedPair()
if args.print_text:
annotated_geneset = set()
annotated_segmentset = set()
with openWrite(args.output) as f:
for pair in annotated_pairs:
annotated_geneset.add(pair['gene'])
annotated_segmentset.add(pair['segment'])
print(pair['gene'], pair['segment'], file=f, sep='\t')
for ann in pair['annotations']:
print(ann['description'], ann['penetranceType'], ann['penetranceValue'], file=f, sep=' | ')
print(file=f)
getLogger().info(f'Genes with annotations: {len(annotated_geneset)}')
getLogger().info(f'Segments with annotations: {len(annotated_segmentset)}')
getLogger().info(f'## TC({len(annotated_geneset)}) iB({len(annotated_segmentset)})')
else:
saveJson(annotated_pairs, args.output)
def getArgs():
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument('gene', nargs='*', help='gene id or file with gene ids')
parser.add_argument('--output', '-o', help='output file')
parser.add_argument('--only-segment', action='store_true', help='only fetch gene altering segments (default: false)')
parser.add_argument('--with-segment', '-s', help='file with gene segment pairs')
parser.add_argument('--with-anno', '-a', help='file with annotations')
parser.add_argument('--print-text', action='store_true', help='output in text format')
parser.add_argument('--filter', action='store_true', help='filter annotations')
parser.add_argument('--absolute-threshold', '-n', type=int, default=5)
parser.add_argument('--percentage-threshold', '-p', type=float, default=50)
parser.add_argument('--filter-list', '-f')
return parser.parse_args()
def filterAnnotation(ann, absolute_threshold, percentage_threshold, filter_set):
desc, pType, val = ann['description'], ann['penetranceType'], ann['penetranceValue']
if desc in filter_set:
return False
if val < 0:
# unknown penetrance
return True
elif pType == 'MISSING':
return False
elif pType == 'ABSOLUTE':
return val >= absolute_threshold
elif pType == 'PERCENTAGE':
return val >= percentage_threshold
else:
raise AssertionError(f'Unknown penetrance type {pType}')
return True
def createGeneSegmentPair(gene, segment):
return {
'gene': gene,
'segment': segment
}
def parseList(filename):
with openRead(filename) as f:
for line in f:
yield line.strip()
def parseTsv(filename):
headers = None
with openRead(filename) as f:
for line in f:
if line.startswith('#'):
headers = line.lstrip('#').rstrip('\n').split()
elif headers:
cols = line.rstrip('\n').split()
yield {h: col for h, col in zip(headers, cols)}
def parseJson(filename):
with openRead(filename) as f:
return json.load(f)
def getGeneAlteringSegment(gene):
url = f'http://ibb-test.vm19002.virt.gwdg.de/ibb/api/genesilencing/v1/silencingseqs?geneIds={gene}'
r = requests.get(url)
assert 200 <= r.status_code < 300, f'Request to {url} failed with status {r.status_code}'
return [item['id'] for item in r.json()]
def getAnnotations(segment):
url = f'http://ibb-test.vm19002.virt.gwdg.de/ibb/api/screen/v1/screens?silencingSeqId={segment}'
r = requests.get(url)
assert 200 <= r.status_code < 300, f'Request to {url} failed with status {r.status_code}'
return [obs for exp in r.json() for obs in exp['observations']]
def saveJson(obj, filename):
if filename and not filename.endswith('.json'):
filename = filename + '.json'
with openWrite(filename) as f:
json.dump(obj, f)
def saveTsv(obj_list, filename, *headers):
if filename and not filename.endswith('.tsv'):
filename = filename + '.tsv'
with openWrite(filename) as f:
if headers:
print('#' + headers[0], *headers[1:], file=f, sep='\t')
for obj in obj_list:
cols = [obj[h] for h in headers]
print(*cols, file=f, sep='\t')
@contextlib.contextmanager
def openWrite(filename=None):
'''Open file for writing or use stdout if no file is provided'''
if filename and filename != '-' and filename != 'stdout':
fh = open(filename, 'w')
else:
fh = sys.stdout
try:
yield fh
finally:
if fh is not sys.stdout:
fh.close()
@contextlib.contextmanager
def openRead(filename):
'''Open file for reading. Auto detect gzip format'''
if filename.endswith('.gz'):
fh = gzip.open(filename, 'rt')
else:
fh = open(filename, 'r')
try:
yield fh
finally:
fh.close()
LOGGER = None
def getLogger():
global LOGGER
if LOGGER is None:
LOGGER = logging.getLogger(__name__)
# fmt = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s')
fmt = logging.Formatter('%(levelname)s - %(message)s')
console_hdl = logging.StreamHandler()
console_hdl.setLevel(logging.DEBUG)
console_hdl.setFormatter(fmt)
LOGGER.setLevel(logging.INFO)
LOGGER.addHandler(console_hdl)
return LOGGER
if __name__ == '__main__':
main()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment