Building a convolutional network to classify MNIST digits¶

In [1]:
import numpy as np
import torch.nn as nn
import torch
import torch.optim as optim
In [2]:
with open('mnist.npy','rb') as f:
    pics=np.load(f)
    labels=np.load(f)
print(f'Pics shape: {pics.shape}')
print(f'labels shape: {labels.shape}')
Pics shape: (60000, 28, 28)
labels shape: (60000,)

In past MNIST manipulations, we've turned each data point into a single row of pixel values, getting rid of the 28x28 picture. Here, we're keeping each data point as-is, so we can use our convolutions.

Our model¶

Here's a very simple convolutional network. Notice it's made up of two sequences of 2D Convolutions, followed by a ReLU, followed by a MaxPool.

In the first sequence, it take in one channel (because it's merely grayscale - were it a color picture, this would be 3), and produces 16 different convolutional filters, each of which is 5x5. They each "stride" along at every pixel (so the stride is 1), and have a padding of 2. This gives us 16 filters, each of which is outputting a 28x28 grid of values. The maxpool outputs the maximum value for each channel for every 2x2 block, which gives us a 16x14x14 block (14 being half of 28, resulting from the 2x2 max pool).

In the second, it takes in 16 channels (ie, the same number that the previous one outputs), creates 32 channels, is 5x5, with a stride of 1, and a padding of 2. This gives us an output from the Conv2d (and ReLU) of 32x14x14. MaxPool downsamples this to a 32x7x7.

These 32x7x7 values are then flattened into a single vector of length 1568 (32*7*7), and linearly combined into 10 output logits.

In [3]:
class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        self.conv1 = nn.Sequential(   # takes in a 1x28x28
            nn.Conv2d(
                in_channels=1,              
                out_channels=16,            
                kernel_size=5,              
                stride=1,                   
                padding=2,                  
            ),                              
            nn.ReLU(),                      
            nn.MaxPool2d(kernel_size=2),    
        ) # outputs a 16x14x14
        self.conv2 = nn.Sequential(  # takes in a 16x14x14
            nn.Conv2d(16, 32, 5, 1, 2),     
            nn.ReLU(),                      
            nn.MaxPool2d(2),                
        ) # outputs a 32x7x7
        self.out = nn.Linear(32 * 7 * 7, 10)
    def forward(self, x):
        x = self.conv1(x) # 16x14x14
        x = self.conv2(x) # 32x7x7
        x = x.view(x.size(0), -1) # flatten this into a single vector of size 32*7*7=1568
        output = self.out(x) # combine into 10 output logits
        return output

Train/test split!¶

In [4]:
from sklearn.model_selection import train_test_split
Xtr,Xte,ytr,yte=train_test_split(pics,labels)
ntr,k,_=Xtr.shape
nte,k,_=Xte.shape

Reshaping our data¶

Our data is n x 28 x 28. Pytorch expects this to be in N x channels x height x width order, so we need to make it n x 1 x 28 x 28. This is what the call to reshape() does. The rest turns the data into tensors, and moves them onto the GPU.

In [5]:
Xtr = torch.tensor(Xtr, dtype=torch.float32).reshape(ntr,1,k,k).to('cuda')
ytr = torch.tensor(ytr).to('cuda')
Xte = torch.tensor(Xte, dtype=torch.float32).reshape(nte,1,k,k).to('cuda')
yte = torch.tensor(yte).to('cuda')

Build the model, move it to the GPU, and print it out just so we can see it.

In [6]:
c=CNN().to('cuda')
print(c)
CNN(
  (conv1): Sequential(
    (0): Conv2d(1, 16, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (conv2): Sequential(
    (0): Conv2d(16, 32, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (out): Linear(in_features=1568, out_features=10, bias=True)
)

Training!¶

In [7]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(c.parameters(), lr=.01)

EPOCHS = 600

for epoch in range(EPOCHS):
    predictions = c(Xtr)
    loss = criterion(predictions, ytr)
        
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
        
    if epoch % 100 == 0:
        with torch.no_grad():
            pred = c(Xte)
            test_loss = criterion(pred, yte)
        print(f'Training Loss: {loss}, Testing Loss: {test_loss}')
Training Loss: 25.252174377441406, Testing Loss: 174.01025390625
Training Loss: 0.16313998401165009, Testing Loss: 0.17874178290367126
Training Loss: 0.09560582786798477, Testing Loss: 0.12043824791908264
Training Loss: 0.07425129413604736, Testing Loss: 0.10984784364700317
Training Loss: 0.060848090797662735, Testing Loss: 0.10514787584543228
Training Loss: 0.050651900470256805, Testing Loss: 0.10732713341712952

Saving the model¶

Here I save the model, so I don't have to train it again in my next session. torch.load() will load it back up again.

In [8]:
torch.save(c,'myMnistConv')

Looking at the results¶

In [11]:
from sklearn.metrics import confusion_matrix
def build_confusion(predictions,truth,title):
    cm=confusion_matrix(predictions,truth)
    import plotly.graph_objects as go
    # Create a Plotly heatmap
    fig = go.Figure(data=go.Heatmap(
        z=cm,
        x=[f'Predicted {i}' for i in range(cm.shape[0])],
        y=[f'Actual {i}' for i in range(cm.shape[0])],
        colorscale='Viridis',  # You can choose any colorscale you like
        colorbar=dict(title='Count')
    ))
    
    # Update layout
    fig.update_layout(
        title=title,
        xaxis_title='Predicted label',
        yaxis_title='Actual label',
    )
    
    # Show plot
    fig.show()
trainPred=np.argmax(c(Xtr).detach().cpu().numpy(),axis=1)
trainTruth=ytr.cpu().numpy()
build_confusion(trainPred,trainTruth,'Training CM')
testPred=np.argmax(c(Xte).detach().cpu().numpy(),axis=1)
testTruth=yte.cpu().numpy()
build_confusion(testPred,testTruth,'Testing CM')
In [12]:
from sklearn.metrics import accuracy_score

print(f'Training accuracy is {accuracy_score(trainTruth,trainPred)}.')
print(f'Testing accuracy is {accuracy_score(testTruth,testPred)}.')
Training accuracy is 0.9875333333333334.
Testing accuracy is 0.9682.

Image Augmentation¶

When we start getting into images, the dataset gets a lot larger in terms of necessary storage. It may not be possible or efficient to load all data into memory, so you might want to retrieve a random subset ("batch") to optimize on.

Additionally, it is common to perform dataset augmentation, where a picture of a 4 can be slightly rotated, or zoomed, or cropped, to make a new picture, which is also of a 4, thereby artificially increasing the size of your dataset. This usually allows you to squeeze a few extra percentage points of accuracy out of your dataset.

In Pytorch, the implementation of both of these things involves making a Dataset and a DataLoader. The Dataset defines, well, the dataset, and the DataLoader defines how data points are loaded and manipulated before being trained on.

Here we've made an example pytorch Dataset. It's made up of three things - a constructor, where you create whatever fields you need, __len__, which is necessary to output the number of elements in the dataset, and __getitem__, which defines how to get the idx-th element of the dataset.

This is a very simple example, which still requires pulling the full dataset into memory to store into self.X and self.y. Suppose our dataset was a whole lot of pictures, and this was unrealistic. Our constructor might store a file directory, __len__ might count the number of images in the directory, and __getitem__ might load and return the idx-th picture in the directory.

I do two tranformations - a Random Rotation of up to 20 degrees in either direction, and a Random amount of sharpness adjustment. There's not a lot of good reason for the second one, I just wanted to show you how to use multiple transformations.

We do this only to the training set! Our goal with the testing set is to test them as-is, and transforming them would do nothing to combat overfitting anyway since we're not training on them.

You can find illustrations of all the transformations here.

In [19]:
from torch.utils.data import Dataset
from torchvision.transforms import v2

class MnistDataset(Dataset):
    def __init__(self,X,y,transforms=None):
        self.X=X
        self.y=y
        self.transforms=transforms

    def __len__(self):
        return self.X.shape[0]

    def __getitem__(self,idx):
        theX=self.X[idx,:,:]
        they=self.y[idx]

        if self.transforms:
            theX=self.transforms(theX)
        return theX, they

trainTransforms=v2.Compose([
    v2.RandomRotation(degrees=20),
    v2.RandomAdjustSharpness(sharpness_factor=2)
])
trainingSet=MnistDataset(Xtr.cpu(),ytr.cpu(),transforms=trainTransforms)
testingSet=MnistDataset(Xte.cpu(),yte.cpu())
In [20]:
from torch.utils.data import DataLoader

trainLoader=DataLoader(trainingSet,batch_size=1000,shuffle=True,num_workers=4)
testLoader=DataLoader(testingSet,batch_size=1000,shuffle=False,num_workers=4)
In [21]:
model=CNN().to('cuda')

EPOCHS=101

criterion=nn.CrossEntropyLoss()
optimizer=optim.Adam(model.parameters(),lr=.01)

for epoch in range(EPOCHS):
    totalloss=0
    for batch, (X,y) in enumerate(trainLoader):
        X=X.to('cuda')
        y=y.to('cuda')
        predictions=model(X)
        loss=criterion(predictions,y)
    
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        totalloss+=loss.item()


    if epoch%20==0:
        totalloss/=len(trainLoader)
        print('Train error',totalloss,epoch)
        test_loss=0
        
        with torch.no_grad():
            for X,y in testLoader:
                X=X.to('cuda')
                y=y.to('cuda')
                pred=model(X)
                test_loss+=criterion(pred,y).item()
            test_loss/=len(testLoader)
        print('  Test error',test_loss,epoch)
Train error 5.906119389004178 0
  Test error 2.1022006193796794 0
Train error 0.1787058432896932 20
  Test error 0.12685690422852833 20
Train error 0.1483608391549852 40
  Test error 0.11265955716371537 40
Train error 0.14338245474629932 60
  Test error 0.10846348355213802 60
Train error 0.14501166525814269 80
  Test error 0.13462353845437366 80
Train error 0.14703123503261142 100
  Test error 0.13238660196463267 100

The above is considerably slower per epoch, as it's doing multiple backward passes per epoch (one for each minibatch), rather than just one, as well as performing the transformations. However, it's not much slower per backward pass.

Notice the training loss is much higher than the previous version. Due to the data augmentation, fitting the training data has gotten harder - this is a good thing! It makes it much harder to overfit.

It probably needed to train longer, but here's the performance.

In [22]:
trainPred=np.argmax(model(Xtr).detach().cpu().numpy(),axis=1)
trainTruth=ytr.cpu()
build_confusion(trainPred,trainTruth,'Training CM')
testPred=np.argmax(model(Xte).detach().cpu().numpy(),axis=1)
testTruth=yte.cpu()
build_confusion(testPred,testTruth,'Testing CM')
print(f'Training accuracy is {accuracy_score(trainTruth,trainPred)}.')
print(f'Testing accuracy is {accuracy_score(testTruth,testPred)}.')
Training accuracy is 0.9641555555555555.
Testing accuracy is 0.9602666666666667.
In [ ]: