import os
import sys
import scipy
import pydicom
import duo.core.material as material
import duo.core.element_table as element_table
import numpy as np
import duo.core.timer as timer
import duo.core.dicom_decoder as dicom_decoder
import duo.zeff.enhance as enhance
import matplotlib
matplotlib.use('PDF')
import matplotlib.pyplot as plt
from matplotlib import cm
import matplotlib.colors as mc
import duo.zeff.blend as blend
import pydicom.pixel_data_handlers.util as pdutil
import cv2 as cv
import string
#------------------------------------------------------------
#------------------------------------------------------------
[docs]class Common:
#------------------------------------------------------------
#------------------------------------------------------------
def __init__(self,
dataPath,
method = "original",
use140kVpWithSn = True,
use100kVp = False):
self.method = method
self.dataPath = dataPath
self.elementTable = None
self.CTNumber_Elow = -1000
self.CTNumber_Ehigh = 3095
# mu/rho
self.muWater_Ehigh = 0.0
self.muWater_Elow = 0.0
self.muAir_Ehigh = 0.0
self.muAir_Elow = 0.0
# Van Abbema, J.K., Van der Schaaf, A., Kristanto, W.,
# Groen, J.M. and Greuter, M.J., 2012. Feasibility and
# accuracy of tissue characterization with dual source
# computed tomography. Physica Medica, 28(1), pp.25-32.
# according to https://radiologykey.com/technical-aspects-of-dual-energy-ct-with-dual-source-ct-systems/
# The SOMATOM Definition Flash uses an additional tin filter (Sn) with a thickness of 0.4 mm to shift the
# mean energy of the 140 kV spectrum from 69 to 89 keV, see Fig. 2.2. The mean energy of the 80 kV spectrum is 52 keV.
if use140kVpWithSn: # 140 kVp with Sn (Tin) filter
self.Ehigh = 89
else: # without the filter
self.Ehigh = 69.28
if use100kVp: # temporary fix
self.Elow = 63.292 # keV, 100 kVp
else: # normal case: 80 kVp
self.Elow = 51.93 # keV, 80 kVp
self.Eave = (self.Ehigh + self.Elow) / 2.0
self.Initialize()
#------------------------------------------------------------
# For some reason, NIST material has two versions
# original: https://physics.nist.gov/cgi-bin/Star/compos.pl?matno=276
# alt: https://physics.nist.gov/PhysRefData/XrayMassCoef/tab2.html
#------------------------------------------------------------
[docs] def Initialize(self):
print("--> Python version = ", sys.version)
print(" matplotlib version = ", matplotlib.__version__)
print(" numpy version = ", np.version.version)
print(" scipy version = ", scipy.__version__)
# load xs data
self.elementTable = element_table.ElementTable(self.dataPath)
water = material.Material("Water", self.elementTable)
if self.method == "original":
water.AddElement(1 , weightFraction = 0.111894)
water.AddElement(8 , weightFraction = 0.888106)
water.Commit()
water.density = 1.0
elif self.method == "alt":
water.AddElement(1 , weightFraction = 0.111898)
water.AddElement(8 , weightFraction = 0.888102)
water.Commit()
water.density = 1.0
air = material.Material("Air", self.elementTable)
if self.method == "original":
air.AddElement(6 , weightFraction = 0.000124)
air.AddElement(7 , weightFraction = 0.755267)
air.AddElement(8 , weightFraction = 0.231781)
air.AddElement(18 , weightFraction = 0.012827)
air.Commit()
air.density = 1.20479e-03
elif self.method == "alt":
air.AddElement(6 , weightFraction = 0.000124)
air.AddElement(7 , weightFraction = 0.755268)
air.AddElement(8 , weightFraction = 0.231781)
air.AddElement(18 , weightFraction = 0.012827)
air.Commit()
air.density = 1.205E-03
self.muWater_Ehigh = water.CalculateMacAtE(self.Ehigh) * water.density
self.muWater_Elow = water.CalculateMacAtE(self.Elow) * water.density
self.muAir_Ehigh = air.CalculateMacAtE(self.Ehigh) * air.density
self.muAir_Elow = air.CalculateMacAtE(self.Elow) * air.density
print(" muWater_Ehigh = {0:.16e}".format(self.muWater_Ehigh))
print(" muWater_Elow = {0:.16e}".format(self.muWater_Elow ))
print(" muAir_Ehigh = {0:.16e}".format(self.muAir_Ehigh ))
print(" muAir_Elow = {0:.16e}".format(self.muAir_Elow ))
#------------------------------------------------------------
#------------------------------------------------------------
[docs] def ProcessCTImage(self, fileName):
df = pydicom.dcmread(fileName)
print("--> process CT image")
print(" RescaleSlope = ", df.RescaleSlope)
print(" RescaleIntercept = ", df.RescaleIntercept)
# converts raw pixel data values to a specific (possibly unitless) physical quantity,
# such as Hounsfield units for CT
# applies Modality LUT transformation to raw data
# df.pixel_array may be uint16
# image is float64
image = pdutil.apply_modality_lut(df.pixel_array, df)
print(" Before adjustment: pixel data range = [{0:f} {1:f}]".format(np.amin(image), np.amax(image)))
print(" df.pixel_array type = {0:s}, shape = {1:s}".format(
str(df.pixel_array.dtype),
str(df.pixel_array.shape)))
print(" image type = {0:s}, shape = {1:s}".format(
str(image.dtype),
str(image.shape)))
# clamp
# for out-of-range data
image = np.where(image < self.CTNumber_Elow, self.CTNumber_Elow, image)
image = np.where(image > self.CTNumber_Ehigh, self.CTNumber_Ehigh, image)
print(" After adjustment: pixel data range = [{0:f} {1:f}]".format(np.amin(image), np.amax(image)))
# clamp
# for pixels with low HU, set them to air
image = np.where(image < -800, -1000, image)
return image
#------------------------------------------------------------
#------------------------------------------------------------
[docs] def ConvertCTNumberToMu(self, CTNumber, mu_water, mu_air):
mu = CTNumber / 1000.0 * (mu_water - mu_air) + mu_water
return mu
#------------------------------------------------------------
#------------------------------------------------------------
[docs] def ConvertMuToCTNumber(self, mu, mu_water, mu_air):
CTNumber = (mu - mu_water) / (mu_water - mu_air) * 1000.0
return CTNumber
#------------------------------------------------------------
#------------------------------------------------------------
[docs] def CalculateGivenCTNumber(self, imageHighKVP, imageLowKVP, methodObj, useGPU = False):
imageZeff = None
if useGPU:
imageZeff = methodObj.CalculateGivenCTNumberGPU(imageHighKVP, imageLowKVP)
else:
imageZeff = methodObj.CalculateGivenCTNumber(imageHighKVP, imageLowKVP)
return imageZeff
#------------------------------------------------------------
# full plot:
# highKVP
# lowKVP
# mixed
# Zeff
# reference
# mixed with EC
#------------------------------------------------------------
[docs] def ApplyModel(self, lowHighDicomDir, fileName, outputImageName, methodObj, **kwargs):
msg = "--> Mean high energy = {:f}, mean low energy = {:f}, average = {:f}".format(self.Ehigh, self.Elow, self.Eave)
print(msg)
# whether to apply denoising to the images
# the original highKVP, lowKVP, mixed and reference are always plotted, whereas
# the denoised Zeff and mixed with EC can be optionally plotted when applyDenoise is True
applyDenoise = kwargs.get("applyDenoise", True)
imageMask = kwargs.get("imageMask", None)
# whether to process the mixed image in the specified dicom dir
# the mixed image shall be stored in a directory separate from low/high kVp images
mixedImageDicomDir = kwargs.get("mixedImageDicomDir", None)
showMixedImage = kwargs.get("showMixedImage", True)
# whether to apply EC to mixed image
applyEC = kwargs.get("applyEC", False)
saveECImage = kwargs.get("saveECImage", False)
referenceImagePath = kwargs.get("referenceImagePath", None)
useGPU = kwargs.get("useGPU", False)
zeffWindowLower = kwargs.get("zeffWindowLower", 7)
zeffWindowUpper = kwargs.get("zeffWindowUpper", 12)
tmg = timer.TimerManager()
tmg.StartOrResume("total")
lowKVPFileName = ""
highKVPFileName = ""
mixedFileName = ""
dd = dicom_decoder.dect_dicom_decoder()
dd.ProcessDirectory(lowHighDicomDir)
if mixedImageDicomDir:
dd.ProcessDirectoryWithMixedImage(mixedImageDicomDir)
lowOrHighKVP = dd.CheckLowOrHighKVP(fileName)
if lowOrHighKVP == "low":
lowKVPFileName = fileName
highKVPFileName = dd.FindPairingFile(fileName)
if mixedImageDicomDir:
mixedFileName = dd.FindMixedFile(fileName)
elif lowOrHighKVP == "high":
highKVPFileName = fileName
lowKVPFileName = dd.FindPairingFile(fileName)
if mixedImageDicomDir:
mixedFileName = dd.FindMixedFile(fileName)
enh = enhance.Enhance()
imageLowKVP = self.ProcessCTImage(lowKVPFileName)
imageLowKVPDenoised = enh.Denoise(imageLowKVP)
imageHighKVP = self.ProcessCTImage(highKVPFileName)
imageHighKVPDenoised = enh.Denoise(imageHighKVP)
imageMixed = None
if mixedImageDicomDir:
imageMixed = self.ProcessCTImage(mixedFileName)
imageMixedDenoised = enh.Denoise(imageMixed)
imageZeff = None
if applyDenoise:
imageZeff = self.CalculateGivenCTNumber(imageHighKVPDenoised, imageLowKVPDenoised, methodObj, useGPU)
else:
imageZeff = self.CalculateGivenCTNumber(imageHighKVP, imageLowKVP, methodObj, useGPU)
imageMixed_ec = None
if applyEC:
if applyDenoise:
imageMixed_ec = self.ElectronicCleanse(imageLowKVPDenoised,
imageHighKVPDenoised,
imageMixedDenoised,
imageZeff,
zeffWindowLower,
zeffWindowUpper,
imageMask)
else:
imageMixed_ec = self.ElectronicCleanse(imageLowKVP,
imageHighKVP,
imageMixed,
imageZeff,
zeffWindowLower,
zeffWindowUpper,
imageMask)
print(" ec mixed image type = {0:s}, shape = {1:s}".format(
str(imageMixed_ec.dtype),
str(imageMixed_ec.shape)))
if saveECImage:
outputName = os.path.basename(mixedFileName)
outputFullPath = "ec_" + outputName
self.SaveDicom(mixedFileName,
imageMixed_ec,
outputFullPath)
if applyDenoise:
np.save("image_high_kvp", imageHighKVPDenoised)
np.save("image_low_kvp", imageLowKVPDenoised)
else:
np.save("image_high_kvp", imageHighKVP)
np.save("image_low_kvp", imageLowKVP)
np.save("image_zeff", imageZeff)
referenceImage = None
if referenceImagePath:
referenceImage = self.ProcessCTImage(referenceImagePath)
tmg.Stop("total")
totalTime = tmg.GetElapsedTimeInSecond("total")
print("--> total time = {0:f} [s]".format(totalTime))
totalNumImages = 3
if mixedImageDicomDir:
totalNumImages += 1
if applyEC:
totalNumImages += 1
if referenceImagePath:
totalNumImages += 1
self.AddImageRegular(imageHighKVP, "high_kvp_{:s}".format(outputImageName))
self.AddImageRegular(imageLowKVP, "low_kvp_{:s}".format(outputImageName))
if mixedImageDicomDir and showMixedImage:
self.AddImageRegular(imageMixed, "mixed_image_{:s}".format(outputImageName))
self.AddImageZeff(imageZeff, "zeff_image_{:s}".format(outputImageName))
if referenceImagePath:
self.AddImageRegular(referenceImage, "reference_{:s}".format(outputImageName))
if applyEC:
self.AddImageRegular(imageMixed_ec, "mixed_image_with_ec_{:s}".format(outputImageName))
imageDict = {"imageZeff" : imageZeff,
"imageMixed" : imageMixed,
"imageMixed_ec" : imageMixed_ec,
"referenceImage" : referenceImage,
"imageHighKVP" : imageHighKVP,
"imageLowKVP" : imageLowKVP}
return imageDict
#------------------------------------------------------------
#------------------------------------------------------------
[docs] def ElectronicCleanse(self, imageLowKVP,
imageHighKVP,
imageMixed,
imageZeff,
zeffWindowLower,
zeffWindowUpper,
imageMask):
def AlphaTransform(pixelMixed,
pixelZeff,
pixelGradY,
thresholdHuGrad,
HUAir,
pixelMask):
newPixelMixed = pixelMixed
# do not apply alpha transform outside of mask
#if pixelMask == 0:
# return newPixelMixed
# if Zeff is too large, set the pixel value to air
if pixelZeff >= zeffWindowUpper:
newPixelMixed = HUAir
# if Zeff is not large, do nothing
elif pixelZeff <= zeffWindowLower:
newPixelMixed = pixelMixed
# if Zeff is between the window, transform it
else:
# method 1: linear
# temp = (pixelZeff - zeffWindowUpper) / (zeffWindowLower - zeffWindowUpper)
# newPixelMixed = (pixelMixed - HUAir) * temp + HUAir
# method 2: non-linear
temp = (pixelZeff - zeffWindowUpper) / (zeffWindowLower - zeffWindowUpper)
temp = np.power(temp, 2.0)
newPixelMixed = (pixelMixed - HUAir) * temp + HUAir
#if pixelGradY >= thresholdHuGrad:
# newPixelMixed = HUAir
return newPixelMixed
# reference: https://docs.opencv.org/2.4/modules/imgproc/doc/filtering.html?highlight=sobel#sobel
gradY = cv.Sobel(imageMixed, cv.CV_64F, 0, 1, ksize = 5)
thresholdHuGrad = np.quantile(gradY, 0.98)
HUAir = -1000
AlphaTransform_v = np.vectorize(AlphaTransform)
imageMixed_ec = AlphaTransform_v(imageMixed,
imageZeff,
gradY,
thresholdHuGrad,
HUAir,
imageMask)
return imageMixed_ec
#------------------------------------------------------------
#------------------------------------------------------------
[docs] def AddImageRegular(self, imageRegular, title):
fig = plt.figure(figsize = (6, 6))
ax = fig.add_subplot(1,1,1)
m = ax.imshow(imageRegular,
interpolation = 'none',
cmap = cm.gray
#vmin = -1000,
#vmax = 1800
)
ax.get_xaxis().set_visible(False)
ax.get_yaxis().set_visible(False)
plt.savefig(title, bbox_inches = 'tight')
#------------------------------------------------------------
#------------------------------------------------------------
[docs] def AddImageZeff(self, imageZeff, title):
fig = plt.figure(figsize = (6, 4))
plt.subplots_adjust(right = 0.9)
ax = fig.add_subplot(1,1,1)
m = ax.imshow(imageZeff,
interpolation = 'none',
cmap = cm.viridis,
vmin = 0,
vmax = 20
)
ax.get_xaxis().set_visible(False)
ax.get_yaxis().set_visible(False)
cbar = fig.colorbar(m, ax = ax)
cbar.ax.locator_params(nbins = 12) # nbins is the max number of bins
plt.savefig(title, bbox_inches = 'tight')
#------------------------------------------------------------
#------------------------------------------------------------
[docs] def PlotSingleImage(self, imageData, outputImageName, markWrongData = True):
imageMaxValue = 10.0
imageMinValue = 0.0
if markWrongData:
upperBoundValue = 2.2
lowerBoundValue = 1.0
numBin = 512
delta = (imageMaxValue - imageMinValue) / numBin
# new colormap used to mark problematic pixels
viridis = cm.get_cmap('viridis', numBin)
newcolors = viridis(np.linspace(0, 1, numBin))
upperBoundColor = np.array([0.5, 1.0, 1.0, 1])
lowerBoundColor = np.array([1.0, 0.0, 0.0, 1])
temp = (upperBoundValue - imageMinValue) / delta
newcolors[int(temp) : numBin, :] = upperBoundColor
temp = (lowerBoundValue - imageMinValue) / delta
newcolors[0 : int(temp) + 1, :] = lowerBoundColor
newcmp = mc.ListedColormap(newcolors)
else:
newcmp = cm.viridis
fig = plt.figure(figsize = (10, 10))
ax = fig.add_subplot(1, 1, 1)
m = ax.imshow(imageData, interpolation = 'none', cmap = newcmp, vmin = imageMinValue, vmax = imageMaxValue)
cbar = fig.colorbar(m, ax = ax)
cbar.ax.locator_params(nbins = 12) # nbins is the max number of bins
plt.savefig(outputImageName)
np.save("image_data", imageData)
#------------------------------------------------------------
#------------------------------------------------------------
[docs] def ShowImageInfo(self, image):
print(" Data type = ", image.dtype)
print(" Max = ", np.amax(image))
print(" Min = ", np.amin(image))
print(" Shape = ", image.shape)
print(" Order = ", np.isfortran(image))
#------------------------------------------------------------
#------------------------------------------------------------
[docs] def ApplyModelIteratively(self, lowHighDicomDir, mixedImageDicomDir, outputECDicomDir, methodObj, **kwargs):
applyDenoise = kwargs.get("applyDenoise", True) # by default, apply denoise
useGPU = kwargs.get("useGPU", False)
zeffWindowLower = kwargs.get("zeffWindowLower", 7)
zeffWindowUpper = kwargs.get("zeffWindowUpper", 12)
enh = enhance.Enhance()
if not os.path.exists(outputECDicomDir):
os.mkdir(outputECDicomDir)
dd = dicom_decoder.dect_dicom_decoder()
dd.ProcessDirectoryWithLowHighAndMixedImages(lowHighDicomDir, mixedImageDicomDir)
tmg = timer.TimerManager()
tmg.StartOrResume("total")
# dectDicomInfo is a list of DECTDicomPair objects
dectDicomInfo = list(dd.dectDicomDict.values())[0]
for idx in range(len(dectDicomInfo)):
item = dectDicomInfo[idx]
print("\n\n\n--> idx = ", idx)
# print(" {0:s}".format(item.dicomFileLowKVP.fullFileName))
# print(" {0:s}".format(item.dicomFileHighKVP.fullFileName))
# print(" {0:s}".format(item.dicomFileMixed.fullFileName))
imageLowKVP = self.ProcessCTImage(item.dicomFileLowKVP.fullFileName)
if applyDenoise:
imageLowKVP = enh.Denoise(imageLowKVP)
imageHighKVP = self.ProcessCTImage(item.dicomFileHighKVP.fullFileName)
if applyDenoise:
imageHighKVP = enh.Denoise(imageHighKVP)
imageMixed = self.ProcessCTImage(item.dicomFileMixed.fullFileName)
if applyDenoise:
imageMixed = enh.Denoise(imageMixed)
imageZeff = self.CalculateGivenCTNumber(imageHighKVP,
imageLowKVP,
methodObj,
useGPU)
imageMixed_ec = self.ElectronicCleanse(imageLowKVP,
imageHighKVP,
imageMixed,
imageZeff,
zeffWindowLower,
zeffWindowUpper)
outputName = os.path.basename(item.dicomFileMixed.fullFileName)
outputName = "ec_{0:03d}_{1:s}".format(idx, outputName)
outputFullPath = os.path.join(outputECDicomDir, outputName)
self.SaveDicom(item.dicomFileMixed.fullFileName,
imageMixed_ec,
outputFullPath)
tmg.Stop("total")
totalTime = tmg.GetElapsedTimeInSecond("total")
print("--> total time = {0:f} [s]".format(totalTime))
#------------------------------------------------------------
#------------------------------------------------------------
[docs] def SaveDicom(self, inputFileName,
imageMixed_ec,
outputFullPath):
df = pydicom.dcmread(inputFileName)
tempImage = (imageMixed_ec - df.RescaleIntercept) / df.RescaleSlope
tempImage = tempImage.astype(np.uint16)
df.PixelData = tempImage.tobytes()
df.save_as(outputFullPath)
#------------------------------------------------------------
# Given a known material, calculate its CT numbers at high and low energies
# to form a one-pixel image. Then apply given method to the one-pixel image
# to calculate its Z values.
#------------------------------------------------------------
[docs] def VerifyModel(self, knownMaterial, methodObj):
muLow = knownMaterial.CalculateMacAtE(self.Elow) * knownMaterial.density
muHigh = knownMaterial.CalculateMacAtE(self.Ehigh) * knownMaterial.density
imageLowKVP = self.ConvertMuToCTNumber(muLow, self.muWater_Elow, self.muAir_Elow)
imageHighKVP = self.ConvertMuToCTNumber(muHigh, self.muWater_Ehigh, self.muAir_Ehigh)
imageZeff = self.CalculateGivenCTNumber(imageHighKVP, imageLowKVP, methodObj)
msg = ">>> material = {:s}, CT low = {:f}, CT high = {:f}, Zeff = {:f}".format(knownMaterial.name,
imageLowKVP,
imageHighKVP,
imageZeff)
print(msg)