# Making sense of data: introduction to statistics for gravitational-wave astronomy

### *AEI IMPRS Lecture Course*
### Stephen Green *stephen.green@aei.mpg.de*

---

## Practical session on machine learning for gravitational waves

In this tutorial we will build a simple **parameter estimation** neural network:
* **Training data:** TaylorF2 waveforms, high SNR, parametrized only by masses; noise added during training
* **Posterior model:** Gaussian with learnable (diagonal) covariance matrix

This should run in about a minute on a laptop.

### Exercises
1. Add new parameters, beyond the masses
2. Extend the Gaussian distribution to include general covariance
3. Make a PP plot

## Imports

In [None]:
from pycbc.waveform import get_fd_waveform, fd_approximants
from pycbc.filter import match
from pycbc.psd import aLIGOZeroDetHighPower
from pycbc.types import FrequencySeries
import numpy as np
import matplotlib.pyplot as plt
import lal
import corner

In [None]:
# pytorch imports
import torch
from torch.utils.data import Dataset, DataLoader, random_split
from torch import nn
from torch.utils.data import DataLoader

## Training data

Generate a training set that (for simplicity) samples only over the component masses. Generate frequency-domain waveforms using TaylorF2. We will add noise during training.

**Exercise:** Add more parameters

In [None]:
num_samples = 10000  # size of the training set

m_lower = 20.0  # solar masses
m_upper = 80.0
masses = m_lower + np.random.random((num_samples, 2)) * (m_upper - m_lower)

# Make sure m1 > m2
masses = np.sort(masses, axis=-1)
masses = np.flip(masses, axis=-1)

In [None]:
masses

In [None]:
# Fixed parameters

distance = 1000  # Mpc
inclination = 0.0  # face on
spin1x = 0.0
spin1y = 0.0
spin1z = 0.0
spin2x = 0.0
spin2y = 0.0
spin2z = 0.0
phase = 0.0

In [None]:
# Waveform settings

approximant = 'TaylorF2'
f_lower = 20.0
f_final = 1024.0
T = 1.0

delta_f = 1 / T
nf = int(f_final / delta_f) + 1
f_array = np.linspace(0.0, f_final, num=nf)

In [None]:
# Noise PSD

psd = np.array(aLIGOZeroDetHighPower(nf, delta_f, 0.0))
psd[0] = psd[1]  # Fix up endpoints
psd[-1] = psd[-2]

asd = np.sqrt(psd)

In [None]:
# Generate training waveforms

hp_list = []
hc_list = []

for i in range(num_samples):
    
    mass1, mass2 = masses[i]
    
    hp, hc = get_fd_waveform(approximant=approximant,
                             mass1=mass1, mass2=mass2,
                             inclination=inclination,
                             distance=distance,
                             coa_phase = phase,
                             spin1x=spin1x, spin1y=spin1y, spin1z=spin1z,
                             spin2x=spin2x, spin2y=spin2y, spin2z=spin2z,        
                             delta_f=delta_f,
                             f_lower=f_lower, f_final=f_final)
    
    f_array = np.array(hp.sample_frequencies)
    
    # Whiten waveforms and rescale so that white noise has unit variance
    hp = hp / asd * np.sqrt(4.0 * delta_f)
    hc = hc / asd * np.sqrt(4.0 * delta_f)
    
    hp_list.append(hp)
    hc_list.append(hc)

hp = np.array(hp_list)
hc = np.array(hc_list)

In [None]:
# Sample waveform

plt.plot(f_array, hp[0].real)
plt.xscale('log')
plt.xlabel('$f$')
plt.ylabel('Re $h_+$')
plt.xlim((10, f_final))
plt.show()

### Package into a pytorch Dataset

In [None]:
# Parameters
#
# This is just the masses. It's better to sample in (Mc, q) rather than (m1, m2) because the posterior is more Gaussian

m1 = masses[:, 0]
m2 = masses[:, 1]

Mc = (m1 * m2)**(3/5) / (m1 + m2)**(1/5)
q = m2 / m1

parameters = np.stack((Mc, q), axis=1).astype(np.float32)

In [None]:
# For best training, parameters should be standardized (zero mean, unit variance across the training set)

parameters_mean = np.mean(parameters, axis=0)
parameters_std = np.std(parameters, axis=0)

parameters_standardized = (parameters - parameters_mean) / parameters_std

In [None]:
# Waveforms
#
# Truncate the arrays to remove zeros below f_lower, and repackage real and imaginary parts
#
# Only consider h_plus for now

lower_cut = int(f_lower / delta_f)
waveforms = np.hstack((hp.real[:, lower_cut:], hp.imag[:, lower_cut:])).astype(np.float32)

In [None]:
class WaveformDataset(Dataset):
    
    def __init__(self, parameters, waveforms):
        self.parameters = parameters
        self.waveforms = waveforms

    def __len__(self):
        return len(self.parameters)

    def __getitem__(self, idx):
        params = self.parameters[idx]
        signal = self.waveforms[idx]
        
        # Add unit normal noise to the signal
        noise = np.random.normal(size = signal.shape).astype(np.float32)
        data = signal + noise
        
        return torch.tensor(data), torch.tensor(params)

In [None]:
waveform_dataset = WaveformDataset(parameters_standardized, waveforms)

In [None]:
# We can sample from the WaveformDataset. This gives us pairs of data and parameters, different noise realizations each time.

x, y = waveform_dataset[0]
plt.plot(x)

## Posterior Model

In [None]:
# Neural networks are constructed by subclassing nn.Module
#
# This has to implement an __init__() and forward() method

class NeuralNetwork(nn.Module):
    
    def __init__(self, input_dim, hidden_dims, output_dim, activation=nn.ReLU()):
        super(NeuralNetwork, self).__init__()
        
        # Hidden layers
        hidden_net_list = []
        hidden_net_list.append(
            nn.Linear(input_dim, hidden_dims[0]))
        for i in range(1, len(hidden_dims)):
            hidden_net_list.append(nn.Linear(hidden_dims[i-1], hidden_dims[i]))
        self.hidden_net_list = nn.ModuleList(hidden_net_list)
        
        # Output layers
        self.output_mean = nn.Linear(hidden_dims[-1], output_dim)
        self.output_log_sigma = nn.Linear(hidden_dims[-1], output_dim)
        
        # Activation function
        self.activation = activation
        
    def forward(self, x):
        """Pass x through all the layers of the network and return the Gaussian distribution"""
        
        h = x
        for layer in self.hidden_net_list:
            h = self.activation(layer(h))

        # Output layer defines a Gaussian
        mean = self.output_mean(h)
        log_sigma = self.output_log_sigma(h)
        sigma = torch.exp(log_sigma)
        
        # Create the Gaussian distribution
        dist = torch.distributions.MultivariateNormal(loc=mean, scale_tril=torch.diag_embed(sigma))
        
        return dist

In [None]:
input_dim = waveforms.shape[-1]
output_dim = parameters.shape[-1]
hidden_dims = [512, 256, 128, 64, 32]

model = NeuralNetwork(input_dim, hidden_dims, output_dim)

In [None]:
print(model)

## Training

In [None]:
# Split the dataset into training and test sets

train_fraction = 0.8
num_train = int(round(train_fraction * num_samples))
num_test = num_samples - num_train
train_dataset, test_dataset = random_split(waveform_dataset, [num_train, num_test])

# The DataLoader is used in training

train_dataloader = DataLoader(train_dataset, batch_size=64)
test_dataloader = DataLoader(test_dataset, batch_size=64)

In [None]:
# The DataLoaders iterate over samples, returning torch tensors containing a batch of data

train_features, train_labels = next(iter(train_dataloader))

In [None]:
train_features

In [None]:
train_features.shape

In [None]:
# We use the Adam optimizer.

optimizer = torch.optim.Adam(model.parameters())

In [None]:
# Training and test loops

def train_loop(dataloader, model, optimizer):
 
    size = len(dataloader.dataset)
    train_loss = 0
    
    for batch, (X, y) in enumerate(dataloader):
        # Compute negative log probability loss
        dist = model(X)        
        loss = - dist.log_prob(y)
        
        train_loss += loss.detach().sum()
        loss = loss.mean()

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if batch % 50 == 0:
            loss, current = loss.item(), batch * len(X)
            print(f"Loss: {loss:>7f}  [{current:>5d}/{size:>5d} samples]")
            
    average_loss = train_loss.item() / size
    print('Average loss: {:.4f}'.format(average_loss))
    return average_loss
            
        
        
def test_loop(dataloader, model):
    size = len(dataloader.dataset)
    test_loss = 0

    with torch.no_grad():
        for X, y in dataloader:
            dist = model(X)
            loss = - dist.log_prob(y)
            test_loss += loss.sum()

    test_loss /= size
    print(f"Test loss: {test_loss:>8f} \n")
    return test_loss

In [None]:
epochs = 20
train_history = []
test_history = []
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    loss = train_loop(train_dataloader, model, optimizer)
    train_history.append(loss)
    loss = test_loop(test_dataloader, model)
    test_history.append(loss)
print("Done!")

In [None]:
epochs = np.arange(1, len(train_history) + 1)
plt.plot(epochs, train_history, label = 'train loss')
plt.plot(epochs, test_history, label = 'test loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.show()

## Evaluation

### Posterior plots

In [None]:
num_posteriors = 10
num_samples = 10000

for n in range(num_posteriors):
    test_x, test_y = test_dataset[n]

    # Predict a posterior
    dist = model(test_x)

    # Sample the posterior
    pred_samples = dist.sample((10000,)).numpy()

    # Undo the standardization
    pred_samples = parameters_std * pred_samples + parameters_mean
    truth = parameters_std * test_y.numpy() + parameters_mean

    # Plot
    corner.corner(pred_samples, truths=truth, labels=['$M_c$', '$q$'])
    plt.show()