Anomaly scoring is the final stage of PatchCore where test image patches are compared against the memory bank to compute both image-level anomaly scores and pixel-level segmentation masks. This is achieved through efficient nearest neighbor search using FAISS.
Overview
The scoring process converts feature distances into interpretable anomaly measures:
Feature Extraction
Extract patch embeddings from the test image using the same pipeline as training
Nearest Neighbor Search
Find the k-nearest neighbors in the memory bank for each test patch
Distance Computation
Compute Euclidean distances to nearest neighbors
Score Aggregation
Aggregate patch scores into image-level scores and pixel-level masks
NearestNeighbourScorer
The core scoring class that manages the memory bank and computes anomaly scores:
class NearestNeighbourScorer ( object ):
def __init__ ( self , n_nearest_neighbours : int , nn_method = FaissNN( False , 4 )) -> None :
"""
Neearest-Neighbourhood Anomaly Scorer class.
Args:
n_nearest_neighbours: [int] Number of nearest neighbours used to
determine anomalous pixels.
nn_method: Nearest neighbour search method.
"""
self .feature_merger = ConcatMerger()
self .n_nearest_neighbours = n_nearest_neighbours
self .nn_method = nn_method
self .imagelevel_nn = lambda query : self .nn_method.run(
n_nearest_neighbours, query
)
self .pixelwise_nn = lambda query , index : self .nn_method.run( 1 , query, index)
Key Parameters
Number of nearest neighbors to consider for scoring. Typical values:
1 (default): Uses distance to single nearest neighbor - most common
3-5 : Averages distances to multiple neighbors - more robust but slower
The nearest neighbor search backend. Options:
FaissNN(on_gpu=False): CPU-based exact search
FaissNN(on_gpu=True): GPU-accelerated exact search (recommended)
ApproximateFaissNN(): Approximate search for very large memory banks
FAISS Integration
PatchCore uses Facebook’s FAISS library for efficient similarity search:
class FaissNN ( object ):
def __init__ ( self , on_gpu : bool = False , num_workers : int = 4 ) -> None :
"""FAISS Nearest neighbourhood search.
Args:
on_gpu: If set true, nearest neighbour searches are done on GPU.
num_workers: Number of workers to use with FAISS for similarity search.
"""
faiss.omp_set_num_threads(num_workers)
self .on_gpu = on_gpu
self .search_index = None
Index Creation
def _create_index ( self , dimension ):
if self .on_gpu:
return faiss.GpuIndexFlatL2(
faiss.StandardGpuResources(), dimension, faiss.GpuIndexFlatConfig()
)
return faiss.IndexFlatL2(dimension)
IndexFlatL2 performs exact nearest neighbor search using L2 (Euclidean) distance. This ensures optimal detection quality.
Fitting the Memory Bank
def fit ( self , features : np.ndarray) -> None :
"""
Adds features to the FAISS search index.
Args:
features: Array of size NxD.
"""
if self .search_index:
self .reset_index()
self .search_index = self ._create_index(features.shape[ - 1 ])
self ._train( self .search_index, features)
self .search_index.add(features)
Running Nearest Neighbor Search
def run (
self ,
n_nearest_neighbours ,
query_features : np.ndarray,
index_features : np.ndarray = None ,
) -> Union[np.ndarray, np.ndarray, np.ndarray]:
"""
Returns distances and indices of nearest neighbour search.
Args:
query_features: Features to retrieve.
index_features: [optional] Index features to search in.
"""
if index_features is None :
return self .search_index.search(query_features, n_nearest_neighbours)
# Build a search index just for this search.
search_index = self ._create_index(index_features.shape[ - 1 ])
self ._train(search_index, index_features)
search_index.add(index_features)
return search_index.search(query_features, n_nearest_neighbours)
FAISS’s search() method returns both distances and indices of nearest neighbors. Distances are used for anomaly scores, indices for interpretability.
Prediction Pipeline
The predict() method of NearestNeighbourScorer computes anomaly scores:
def predict (
self , query_features : List[np.ndarray]
) -> Union[np.ndarray, np.ndarray, np.ndarray]:
"""Predicts anomaly score.
Searches for nearest neighbours of test images in all
support training images.
Args:
detection_query_features: [dict of np.arrays] List of np.arrays
corresponding to the test features generated by
some backbone network.
"""
query_features = self .feature_merger.merge(
query_features,
)
query_distances, query_nns = self .imagelevel_nn(query_features)
anomaly_scores = np.mean(query_distances, axis =- 1 )
return anomaly_scores, query_distances, query_nns
Score Computation
Single Nearest Neighbor (k=1)
For each test patch, the anomaly score is simply the distance to its nearest neighbor: # query_distances shape: [num_patches, 1]
anomaly_scores = query_distances[:, 0 ]
Interpretation: Higher distance = more anomalous (further from normal training patches)
Multiple Nearest Neighbors (k>1)
When using k>1, scores are averaged: # query_distances shape: [num_patches, k]
anomaly_scores = np.mean(query_distances, axis =- 1 )
Benefit: More robust to noise, but slightly slower
From Patches to Images
PatchCore computes both patch-level and image-level scores:
def _predict ( self , images ):
"""Infer score and mask for a batch of images."""
images = images.to(torch.float).to( self .device)
_ = self .forward_modules.eval()
batchsize = images.shape[ 0 ]
with torch.no_grad():
features, patch_shapes = self ._embed(images, provide_patch_shapes = True )
features = np.asarray(features)
# Get patch-level scores
patch_scores = image_scores = self .anomaly_scorer.predict([features])[ 0 ]
# Aggregate to image-level scores
image_scores = self .patch_maker.unpatch_scores(
image_scores, batchsize = batchsize
)
image_scores = image_scores.reshape( * image_scores.shape[: 2 ], - 1 )
image_scores = self .patch_maker.score(image_scores)
# Reshape to spatial dimensions for segmentation
patch_scores = self .patch_maker.unpatch_scores(
patch_scores, batchsize = batchsize
)
scales = patch_shapes[ 0 ]
patch_scores = patch_scores.reshape(batchsize, scales[ 0 ], scales[ 1 ])
# Convert to segmentation masks
masks = self .anomaly_segmentor.convert_to_segmentation(patch_scores)
return [score for score in image_scores], [mask for mask in masks]
Image-Level Scoring
The PatchMaker.score() method aggregates patch scores using max pooling:
def score ( self , x ):
was_numpy = False
if isinstance (x, np.ndarray):
was_numpy = True
x = torch.from_numpy(x)
while x.ndim > 1 :
x = torch.max(x, dim =- 1 ).values # Take maximum across all dimensions
if was_numpy:
return x.numpy()
return x
Max pooling is used instead of average pooling because anomalies are often localized. A single highly anomalous patch should flag the entire image.
Segmentation Mask Generation
The RescaleSegmentor converts patch scores into smooth segmentation masks:
class RescaleSegmentor :
def __init__ ( self , device , target_size = 224 ):
self .device = device
self .target_size = target_size
self .smoothing = 4
def convert_to_segmentation ( self , patch_scores ):
with torch.no_grad():
if isinstance (patch_scores, np.ndarray):
patch_scores = torch.from_numpy(patch_scores)
_scores = patch_scores.to( self .device)
_scores = _scores.unsqueeze( 1 )
_scores = F.interpolate(
_scores, size = self .target_size, mode = "bilinear" , align_corners = False
)
_scores = _scores.squeeze( 1 )
patch_scores = _scores.cpu().numpy()
return [
ndimage.gaussian_filter(patch_score, sigma = self .smoothing)
for patch_score in patch_scores
]
Segmentation Pipeline
Spatial Reshaping
Convert flat patch scores back to 2D grid (e.g., 56×56)
Bilinear Upsampling
Interpolate to original image resolution (e.g., 224×224)
Gaussian Smoothing
Apply Gaussian filter with σ=4 for smooth boundaries
The smoothing step is crucial for visualization and removes blocky artifacts from patch-based scoring.
Feature Merging
Before scoring, features from multiple layers are concatenated:
class ConcatMerger ( _BaseMerger ):
@ staticmethod
def _reduce ( features ):
# NxCxWxH -> NxCWH
return features.reshape( len (features), - 1 )
class _BaseMerger :
def __init__ ( self ):
"""Merges feature embedding by name."""
def merge ( self , features : list ):
features = [ self ._reduce(feature) for feature in features]
return np.concatenate(features, axis = 1 )
When using multiple backbone layers (e.g., layer2 + layer3), their features are flattened and concatenated before distance computation.
ApproximateFaissNN
For very large memory banks (>1M patches), approximate search can be used:
class ApproximateFaissNN ( FaissNN ):
def _train ( self , index , features ):
index.train(features)
def _gpu_cloner_options ( self ):
cloner = faiss.GpuClonerOptions()
cloner.useFloat16 = True
return cloner
def _create_index ( self , dimension ):
index = faiss.IndexIVFPQ(
faiss.IndexFlatL2(dimension),
dimension,
512 , # n_centroids
64 , # sub-quantizers
8 ,
) # nbits per code
return self ._index_to_gpu(index)
IndexIVFPQ Parameters
Number of Voronoi cells for inverted index. More centroids = better accuracy but slower.
Number of sub-quantizers for product quantization. Higher = better approximation.
Bits per sub-quantizer code. Typically 8 bits (256 values).
IndexIVFPQ requires a training step and provides approximate results. Only use when exact search is too slow.
Complete Scoring Example
Here’s a complete example of the scoring process:
# Training phase
patchcore = PatchCore( device = 'cuda' )
patchcore.load( ... ) # Configure model
patchcore.fit(train_dataloader) # Build memory bank
# Inference phase
test_image = load_image( 'test.jpg' ) # Shape: [1, 3, 224, 224]
scores, masks, _, _ = patchcore.predict(test_image)
# Results
image_score = scores[ 0 ] # Scalar anomaly score
seg_mask = masks[ 0 ] # Shape: [224, 224]
# Thresholding
is_anomalous = image_score > threshold # e.g., threshold=0.5
anomalous_pixels = seg_mask > threshold
Scoring Metrics
PatchCore is evaluated using several metrics:
Area under ROC curve for binary classification (normal vs. anomalous) Target: >99% for MVTec AD
Area under ROC curve for pixel-wise anomaly localization Target: >98% for MVTec AD
Per-Region-Overlap score measuring localization quality Target: >95% for MVTec AD
Optimization Tips
GPU Acceleration: Always use --faiss_on_gpu for datasets with >10k memory bank patches:python bin/run_patchcore.py \
patch_core ... --faiss_on_gpu \
...
This can provide 10-50× speedup for nearest neighbor search.
Batch Inference: Process multiple images together to maximize GPU utilization:# Instead of:
for img in images:
score = patchcore.predict(img)
# Do:
scores = patchcore.predict(image_batch) # Batch size: 16-32
Saving and Loading
The scorer can be saved and loaded for deployment:
def save (
self ,
save_folder : str ,
save_features_separately : bool = False ,
prepend : str = "" ,
) -> None :
self .nn_method.save( self ._index_file(save_folder, prepend))
if save_features_separately:
self ._save(
self ._detection_file(save_folder, prepend), self .detection_features
)
def load ( self , load_folder : str , prepend : str = "" ) -> None :
self .nn_method.load( self ._index_file(load_folder, prepend))
if os.path.exists( self ._detection_file(load_folder, prepend)):
self .detection_features = self ._load(
self ._detection_file(load_folder, prepend)
)
Saved files:
nnscorer_search_index.faiss: FAISS index containing memory bank
nnscorer_features.pkl (optional): Raw feature vectors
patchcore_params.pkl: Model configuration
Visualization Example
import matplotlib.pyplot as plt
# Run inference
image_score, seg_mask = patchcore.predict(test_image)[ 0 : 2 ]
# Visualize
fig, axes = plt.subplots( 1 , 3 , figsize = ( 15 , 5 ))
axes[ 0 ].imshow(test_image.squeeze().permute( 1 , 2 , 0 ))
axes[ 0 ].set_title( 'Input Image' )
axes[ 1 ].imshow(seg_mask[ 0 ], cmap = 'jet' )
axes[ 1 ].set_title( f 'Anomaly Map (Score: { image_score[ 0 ] :.3f} )' )
axes[ 2 ].imshow(test_image.squeeze().permute( 1 , 2 , 0 ))
axes[ 2 ].imshow(seg_mask[ 0 ], cmap = 'jet' , alpha = 0.5 )
axes[ 2 ].set_title( 'Overlay' )
plt.show()
Next Steps
PatchCore Algorithm Review the complete algorithm pipeline
Quick Start Start building your own anomaly detector