Source code for arctic_ai.image_stitch

"""Image Writing 
==========
Contains functions for stitching images together."""
import pandas as pd, numpy as np
from scipy.ndimage import label as scilabel
from skimage.measure import regionprops_table
import cv2, os, subprocess
from deepzoom import *
from deepzoom import _get_or_create_path,_get_files_path
from PIL import Image
import tqdm
import dask
from dask.diagnostics import ProgressBar
from scipy.special import softmax
import torch
from sauth import SimpleHTTPAuthHandler, serve_http
from skimage.draw import circle
from pathpretrain.utils import load_image
Image.MAX_IMAGE_PIXELS = None

[docs]class Numpy2DZI(ImageCreator): def __init__( self, tile_size=254, tile_overlap=1, tile_format="jpg", image_quality=0.8, resize_filter=None, copy_metadata=False, compression=1. ): super().__init__(tile_size,tile_overlap,tile_format,image_quality,resize_filter,copy_metadata) self.compression=compression
[docs] def create(self, source_arr, destination): """ Create a deep zoom image from the given source array. Parameters ---------- source_arr : ndarray The source image as a NumPy array. destination : str The destination folder to save the tiles to. Returns ------- str The destination folder where the tiles were saved. """ # potentially have an option where dynamically softlink once deeper layer is made so slide is readily available, push to background process and write metadata for dash app to read # speed up image saving with dask https://stackoverflow.com/questions/54615625/how-to-save-dask-array-as-png-files-slice-by-slice https://github.com/dask/dask-image/issues/110 self.image = PIL.Image.fromarray(source_arr if self.compression==1 else cv2.resize(source_arr,None,fx=1/self.compression,fy=1/self.compression,interpolation=cv2.INTER_CUBIC)) width, height = self.image.size self.descriptor = DeepZoomImageDescriptor( width=width, height=height, tile_size=self.tile_size, tile_overlap=self.tile_overlap, tile_format=self.tile_format, ) image_files = _get_or_create_path(_get_files_path(destination)) for level in tqdm.trange(self.descriptor.num_levels, desc='level'): level_dir = _get_or_create_path(os.path.join(image_files, str(level))) level_image = self.get_image(level) for (column, row) in tqdm.tqdm(self.tiles(level), desc='tiles'): bounds = self.descriptor.get_tile_bounds(level, column, row) tile = level_image.crop(bounds) format = self.descriptor.tile_format tile_path = os.path.join(level_dir, "%s_%s.%s" % (column, row, format)) tile_file = open(tile_path, "wb") if self.descriptor.tile_format == "jpg": jpeg_quality = int(self.image_quality * 100) tile.save(tile_file, "JPEG", quality=jpeg_quality) else: tile.save(tile_file) self.descriptor.save(destination) return destination
def stitch_slide(arr, dzi_out, compression=4, mask=None): if mask is not None: arr[mask]=255 Numpy2DZI(compression=compression).create(arr, dzi_out) def npy2dzi(npy_file='', dzi_out='', compression=1.): stitch_slide(load_image(npy_file),compression=compression,dzi_out=dzi_out) def stitch_slides(basename="", compression=4, dirname=".", ext=".tif", pull_mask=False, mask_compresssion=8): image=os.path.join(dirname,"inputs",f"{basename}{ext}") basename=os.path.basename(image).replace(ext,'') image=load_image(image)#np.load(image) xy_bounds=pd.read_pickle(os.path.join(dirname,"masks",f"{basename}.pkl")) write_files=[] for ID in xy_bounds: (xmin,ymin),(xmax,ymax)=xy_bounds[ID] dzi_out=os.path.join(dirname,"dzi_files",f"{basename}_{ID}_img.dzi") arr=image[xmin:xmax,ymin:ymax] mask=None if pull_mask: mask=dask.delayed(lambda x: cv2.resize(np.load(x).astype(int),arr.shape[:2],interpolation=cv2.INTER_NEAREST).astype(bool))(os.path.join(dirname,"masks",f"{basename}_{ID}.npy")) write_files.append(dask.delayed(stitch_slide)(arr,dzi_out,compression,mask)) with ProgressBar(): dask.compute(write_files,scheduler='threading')