## Practical 4: Aggregables: working with massive data

## The cells of this practical can be entered (by cut and paste) into the IPython console.

## Before entering the first cell, make sure you have changed to the directory hail-practical. Skip the first cell haven't closed IPython console since running the last practical.

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn
from math import log, isnan
import hail
import matplotlib.patches as mpatches

%matplotlib inline

def qqplot(pvals):
    spvals = sorted([x for x in pvals if x and not(isnan(x))])
    exp = [-log(float(i) / len(spvals), 10) for i in np.arange(1, len(spvals) + 1, 1)]
    obs = [-log(p, 10) for p in spvals]
    plt.scatter(exp, obs)
    plt.plot(np.arange(0, max(max(exp), max(obs))), c="red")
    plt.xlabel("Expected p-value (-log10 scale)")
    plt.ylabel("Observed p-value (-log10 scale)")
    plt.xlim(xmin=0)
    plt.ylim(ymin=0)

hc = hail.HailContext()

## Aggregables

Terrible name, we know.

In the practical on the Hail expression language, you learned about arrays, filter, map, and aggregating opeprations like sum, max, etc.

Hail is designed to work with very large datasets, datasets with 100s of millions (billions?) of variants and 10s or 100s of trillions of genotypes.  More data than one could hope to store on a single computer.  Therefore, Hail stores and computes on data in a distributed fasion.  But we still want a simple programming model that allows us to query and transform such distributed data.  Thats where Aggregables come in.

And `Aggregable[T]` is distributed collection of elements of type `T`.  The interface is modeled on `Array[T]`, but aggregables can be arbitrarily large and they are unordered, so they don't support operations like indexing.

Aggregables support map and filter.  Like sum, max, etc. on arrays, aggregables support operations which we call "aggregators" that operate on the entire aggregable collection and produce a summary or derived statistic.  See the [documentation](https://hail.is/hail/types.html#aggregable) for a complete list of aggregators.

Aggregables are available in expressions on various methods on [VariantDataset](https://hail.is/hail/hail.VariantDataset.html).  For example, [query_samples](https://hail.is/hail/hail.VariantDataset.html#hail.VariantDataset.query_samples) has an aggregable `samples: Aggregable[String]`, [query_variants](https://hail.is/hail/hail.VariantDataset.html#hail.VariantDataset.query_variants) has an aggregable `variants: Aggregable[Variant]` and [query_genotypes](https://hail.is/hail/hail.VariantDataset.html#hail.VariantDataset.query_genotypes) has an aggregable `gs: Aggregable[Genotype]` which is the collection of *all* the genotypes in the dataset.

Let's load a small VCF to play with.

In [None]:
vds = hc.import_vcf('sample.vcf')
print(vds.variant_schema)

There are several aggregators for obtaining elements of an aggregable.  `.take(n)` returns `n` elements of an aggregable sampled randomly. `takeBy(x => <expr>, n)` returns the `n` elements with highest value for `<expr>`.  `<expr>` must be a type that can be compared: numeric or `String`.  Let's grab a few variants and genotypes.

In [None]:
# five random variants
vds.query_variants('variants.take(5)')

In [None]:
# return five genotypes with the largest gq
vds.query_genotypes('gs.takeBy(g => g.gq, 5)')

`collect()` returns an array of *all* the elements of an array.

**Warning**: This is very dangerous for large data!  If you collect 100 trillion genotypes, it will fail, probably with an out of memory error (or something more obscure).

In these practicals, we routinely use collect to return values for plotting.  Here's an example using `collect` to return the allele number (number of called alternate alleles) per variant.

Note: We're doing *two* aggregations here.  In `annotate_variants_expr`, `gs: Aggregable[Genotype]` is the collection of all genotypes for the given variant (ranging over sample), and `variants: Aggregable[Variant]` in `query_variants` is the collection of all variants in the dataset.

`g.nNonRefAlleles` returns number of called non-reference alleles (0/0 = 0, 0/1 = 1, 1/1 = 2).  Note, we're using `map` to transform aggregables.

In [None]:
ANs = (vds.annotate_variants_expr('va.AN = gs.map(g => g.nNonRefAlleles).sum()')
       .query_variants('variants.map(v => va.AN).collect()'))
ANs[:5] # print the first 5

You might have noticed there is some tricky magic here!  In `variants.map(v => va.AN)`, we're mapping over variants but we're accessing the variant annotations `va`.  How is that possible?

Aggregables differ from arrays in one other important respect: in addition to the elements of the aggregable, each element has an associated **context**.  `map` and `filter` transform the elements of the collection, but do not change the context.  The documentation for each method which provides an aggregable will document the context for the aggregable.  For example, see the documentation for [query_genotypes_typed](https://hail.is/hail/hail.VariantDataset.html#hail.VariantDataset.query_genotypes_typed).  In `query_genotypes`, `gs: Aggregable[Genotype]` is the collection of all genotypes in the dataset.  Each genotype is associated with a variant `v` and a sample `s`, so those are in the context of `gs`.  In addition, associated to each variant is its variant annotations `va` and to each sample `s` its sample annotations `sa`, so those are in the context, too.  Finally, the genotype `g` itself is in the context, so if you map away the genotype, it is still accessible as `g`!  Here's an example to sample 8 dp fields from Het genotypes.

Note, the genotype schema is fixed: `g: Genotype`.  To see the accessible fields and methods on `Genotype`, see the [documentation](https://hail.is/hail/types.html#genotype).

In [None]:
# you can still access g since it is in the context even though we've mapped the gs aggregable to the genotype depth
vds.query_genotypes('gs.map(g => g.dp).filter(dp => g.isHet()).take(8)')

## Hard Exercise

Compute, for each variant, the top 5 five non-reference genotypes by depth and then collect collect a few variants to show the output.  Hint: like the `ANs` example above, you'll have to annotate variants withe the genotypes per variant using `takeBy`, then query variants to show a few examples.  Think in terms of the `VariantDataset` to understand the aggregable context and what variables are available.

Replace `<?>` with code.  Good luck!

In [None]:
<?>

## A few more useful aggregators

See the [documentation](https://hail.is/hail/types.html#aggregable) for the full list.

You can use `count` to count the number of elements of an aggregator.

In [None]:
# get number of variants
vds.query_variants('variants.count()')

You can compute a collection of simple statistics with `stats`.

In [None]:
# depth statisticals for all genotypes
vds.query_genotypes('gs.map(g => g.dp).stats()')

You can use `hist(min, max, bins)`, [documentation](https://hail.is/hail/types.html#aggregable-double) to compute a histogram of a numeric aggregable.  Let's use `hist` to plot the histogram of genotype quality (GQ) over all genotypes.

In [None]:
dp_hist = vds.query_genotypes('gs.map(g => g.dp).hist(0, 450, 45)')
dp_hist

In [None]:
# plot the histogram
plt.bar(dp_hist.binEdges[:-1], dp_hist.binFrequencies, width=10)
plt.show()

We'll explore the distribute of DP and GQ more in a later practical.

The aggregator `counter` counts the number of times each element occures in a aggregable.  When applied to an `Aggregable[T]`, it returns a `Dict[T, Long]` which maps each element to its count.  Let's count the distribution of genotypes (missing, 0, 1 and 2) in the dataset.

In [None]:
vds.query_genotypes('gs.map(g => g.nNonRefAlleles).counter()')

# Exercises

Recall the structure of `filtered_vds` by printing out the variant and sample schemas in the next cell.  Look at the first practical if you've forgotten how to print `VariantDataset` schemas.  Fill in the `<?>` with code.

In [None]:
<?>

In [None]:
## Number 1: What fraction of British samples (`sa.metadata.Population == "GBR"`) have purple hair?

Fill in the `<?>` with code.

In [None]:
filtered_vds.query_samples('samples.filter(s => <?>).fraction(s => <?>)')

## Number 2: What is the mean caffeine consumption among South Asian samples (`sa.metadata.SuperPopulation == "SAS"`)?

Fill in the `<?>` with code.

Hint: `.stats()` will compute a variety of useful metrics from a numeric aggregable.

In [None]:
filtered_vds.query_samples('samples.filter(s => <?>).map(s => <?>).<?>')

# You don't need to run this cell

## This can be used to recreate `filtered_vds` if you exited the IPython console

In [None]:
vds = hc.read('1kg.vds')

vds = vds.variant_qc()
vds = vds.sample_qc()
vds.persist() # We'll use these results several times, so checkpoint our computations

filtered_vds = (vds.filter_variants_expr('va.qc.callRate > 0.95')
                .filter_samples_expr('sa.qc.callRate > 0.95'))

filtered_vds = filtered_vds.annotate_samples_table('sample_annotations.txt', 
                                                   sample_expr='Sample',
                                                   root='sa.metadata',
                                                   config=TextTableConfig(impute=True))
filtered_vds = filtered_vds.annotate_variants_table('variant_annotations.txt', 
                                                   variant_expr='Variant',
                                                   code='va.consequence = table.Consequence',
                                                   config=TextTableConfig(impute=True))
