Skip to content
This repository was archived by the owner on Apr 20, 2020. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
228 changes: 109 additions & 119 deletions lazyflow/operators/opLabelVolume.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@

from threading import Lock as ThreadLock
from functools import partial
from abc import ABCMeta, abstractmethod, abstractproperty
import logging

import numpy as np
import vigra
Expand All @@ -12,14 +14,7 @@
from lazyflow.request import Request, RequestPool
from lazyflow.operators import OpCompressedCache, OpReorderAxes

# try to import the blockedarray module, fail only if neccessary
try:
import blockedarray
except ImportError as e:
_blockedarray_module_available = False
_importMsg = str(e)
else:
_blockedarray_module_available = True
logger = logging.getLogger(__name__)


## OpLabelVolume - the **unified** connected components operator
Expand Down Expand Up @@ -51,15 +46,13 @@ class OpLabelVolume(Operator):
## Labeled volume
# Axistags and shape are the same as on the Input, dtype is an integer
# datatype.
# Note: The output might be cached internally depending on the chosen CC
# method, use the CachedOutput to avoid duplicate caches. (see inner
# operator below for details)
# Note: This output is just an alias for CachedOutput, because all
# implementations use internal caches.
Output = OutputSlot()

## Cached label image
# Access the internal cache. If you were planning to cache the labeled
# volume, be sure to use this slot, since it makes use of the internal
# cache that might be in use anyway.
# Axistags and shape are the same as on the Input, dtype is an integer
# datatype.
CachedOutput = OutputSlot()

name = "OpLabelVolume"
Expand Down Expand Up @@ -105,7 +98,7 @@ def setupOutputs(self):
self._op5_2_cached.Input.connect(self._opLabel.CachedOutput)

# set the final reordering operator's AxisOrder to that of the input
origOrder = "".join([s for s in self.Input.meta.getTaggedShape()])
origOrder = self.Input.meta.getAxisKeys()
self._op5_2.AxisOrder.setValue(origOrder)
self._op5_2_cached.AxisOrder.setValue(origOrder)

Expand Down Expand Up @@ -145,6 +138,8 @@ def _setBG(self):

## parent class for all connected component labeling implementations
class OpLabelingABC(Operator):
__metaclass__ = ABCMeta

## input with axes 'xyzct'
Input = InputSlot()

Expand All @@ -154,6 +149,11 @@ class OpLabelingABC(Operator):
Output = OutputSlot()
CachedOutput = OutputSlot()

## list of supported dtypes
@abstractproperty
def supportedDtypes(self):
pass

def __init__(self, *args, **kwargs):
super(OpLabelingABC, self).__init__(*args, **kwargs)
self._cache = None
Expand All @@ -162,6 +162,15 @@ def __init__(self, *args, **kwargs):
def setupOutputs(self):
labelType = np.uint32

# check if the input dtype is valid
if self.Input.ready():
dtype = self.Input.meta.dtype
if dtype not in self.supportedDtypes:
msg = "{}: dtype '{}' not supported "\
"with method 'vigra'. Supported types: {}"
msg = msg.format(self.name, dtype, self.supportedDtypes)
raise ValueError(msg)

# remove unneeded old cache
if self._cache is not None:
self._cache.Input.disconnect()
Expand All @@ -181,11 +190,9 @@ def setupOutputs(self):
self.Output.meta.assignFrom(self._cache.Output.meta)
self.CachedOutput.meta.assignFrom(self._cache.Output.meta)

# prepare locks for each channel and time slice
s = self.Input.meta.getTaggedShape()
shape = (s['c'], s['t'])
self._cached = np.zeros(shape)

# prepare locks for each channel and time slice
locks = np.empty(shape, dtype=np.object)
for c in range(s['c']):
for t in range(s['t']):
Expand All @@ -204,7 +211,9 @@ def propagateDirty(self, slot, subindex, roi):
self.CachedOutput.setDirty(outroi)

def execute(self, slot, subindex, roi, result):
#FIXME we don't care right now which slot is requested, just return cached CC
#FIXME we don't care right now which slot is requested, just return
# cached CC (all implementations cache anyways)

# get the background values
bg = self.Background[...].wait()
bg = vigra.taggedView(bg, axistags=self.Background.meta.axistags)
Expand All @@ -216,58 +225,86 @@ def execute(self, slot, subindex, roi, result):
# do labeling in parallel over channels and time slices
pool = RequestPool()

## function for request ##
def singleSliceRequest(start3d, stop3d, c, t, bg):
# computing CC is a critical section
self._locks[c, t].acquire()

# update the slice
self._updateCache(start3d, stop3d, c, t, bg)

# leave the critical section
self._locks[c, t].release()
## end function for request ##

for t in range(roi.start[4], roi.stop[4]):
for c in range(roi.start[3], roi.stop[3]):
# only one thread may update the cache for this c and t, other requests
# must wait until labeling is finished
self._locks[c, t].acquire()
if self._cached[c, t]:
# this slice is already computed
continue
# update the whole slice
req = Request(partial(self._updateSlice, c, t, bg[c, t]))
req = Request(partial(singleSliceRequest,
roi.start[:3], roi.stop[:3],
c, t, bg[c, t]))
pool.add(req)

logger.debug("{}: Computing connected components for ROI {}".format(
self.name, roi))
pool.wait()
pool.clean()

req = self._cache.Output.get(roi)
req.writeInto(result)
req.block()

# release locks and set caching flags
for t in range(roi.start[4], roi.stop[4]):
for c in range(roi.start[3], roi.stop[3]):
self._cached[c, t] = 1
self._locks[c, t].release()

## compute the requested slice and put the results into self._cache
## compute the requested roi and put the results into self._cache
#
@abstractmethod
def _updateCache(self, start3d, stop3d, c, t, bg):
pass


class OpNonLazyCC(OpLabelingABC):

def setupOutputs(self):
if self.Input.ready():
s = self.Input.meta.getTaggedShape()
shape = (s['c'], s['t'])
self._cached = np.zeros(shape, dtype=np.bool)
super(OpNonLazyCC, self).setupOutputs()

## wraps the childrens' updateSlice function to check if recomputation is
## needed
def _updateCache(self, start3d, stop3d, c, t, bg):
# we compute the whole slice, regardless of actual roi, if needed
if self._cached[c, t]:
return
self._updateSlice(c, t, bg)
self._cached[c, t] = True

@abstractmethod
def _updateSlice(self, c, t, bg):
raise NotImplementedError("This is an abstract method")
pass


## vigra connected components
class _OpLabelVigra(OpLabelingABC):
class _OpLabelVigra(OpNonLazyCC):
name = "OpLabelVigra"
supportedDtypes = [np.uint8, np.uint32, np.float32]

def setupOutputs(self):
if self.Input.ready():
supportedDtypes = [np.uint8, np.uint32, np.float32]
dtype = self.Input.meta.dtype
if dtype not in supportedDtypes:
msg = "OpLabelVolume: dtype '{}' not supported "\
"with method 'vigra'. Supported types: {}"
msg = msg.format(dtype, supportedDtypes)
raise ValueError(msg)
super(_OpLabelVigra, self).setupOutputs()
def propagateDirty(self, slot, subindex, roi):
# set the cache to dirty
self._cached[roi.start[3]:roi.stop[3],
roi.start[4]:roi.stop[4]] = 0
super(_OpLabelVigra, self).propagateDirty(slot, subindex, roi)

def _updateSlice(self, c, t, bg):
source = vigra.taggedView(self.Input[..., c, t].wait(),
axistags='xyzct')
source = source.withAxes(*'xyz')
result = vigra.analysis.labelVolumeWithBackground(
source, background_value=int(bg))
if source.shape[2] > 1:
result = vigra.analysis.labelVolumeWithBackground(
source, background_value=int(bg))
else:
result = vigra.analysis.labelImageWithBackground(
source[..., 0], background_value=int(bg))
result = result.withAxes(*'xyzct')

stop = np.asarray(self.Input.meta.shape)
Expand All @@ -279,22 +316,37 @@ def _updateSlice(self, c, t, bg):
self._cache.setInSlot(self._cache.Input, (), roi, result)


## blockedarray connected components
class _OpLabelBlocked(OpLabelingABC):
# try to import the blockedarray module, fail only if neccessary
try:
from blockedarray import OpBlockedConnectedComponents
except ImportError as e:
_blockedarray_module_available = False
_importMsg = str(e)
class OpBlockedConnectedComponents(object):
pass
else:
_blockedarray_module_available = True
_importMsg = "No error, importing blockedarray worked."

def haveBlocked():
return _blockedarray_module_available


## Wrapper for blockedarray.OpBlockedConnectedComponents
# This wrapper takes care that the module is indeed imported, and sets the
# block shape for the cache.
class _OpLabelBlocked(OpBlockedConnectedComponents):
name = "OpLabelBlocked"

def _updateSlice(self, c, t, bg):
assert _blockedarray_module_available,\
"Failed to import blockedarray: {}".format(_importMsg)
assert bg == 0,\
"Blocked Labeling not implemented for background value {}".format(bg)
"Failed to import blockedarray. Message was: {}".format(_importMsg)

blockShape = _findBlockShape(self.Input.meta.shape[:3])
blockShape = _findBlockShape(self.Input.meta.shape[:3]) + (1, 1)
logger.debug("{}: Using blockshape {}".format(self.name, blockShape))
self._cache.BlockShape.setValue(blockShape)
super(_OpLabelBlocked, self)._updateSlice(c, t, bg)

source = _Source(self.Input, blockShape, c, t)
sink = _Sink(self._cache, c, t)
cc = blockedarray.dim3.ConnectedComponents(source, blockShape)
cc.writeToSink(sink)


## Feeds meta data into OpCompressedCache
Expand All @@ -320,68 +372,6 @@ def setMeta(self, meta):
self.Output.meta.assignFrom(meta)


if _blockedarray_module_available:

def fullRoiFromPQ(slot, p, q, c, t):
fullp = np.zeros((5,), dtype=np.int)
fullq = np.zeros((5,), dtype=np.int)
fullp[:3] = p
fullq[:3] = q
fullp[3] = c
fullp[4] = t
fullq[3] = c+1
fullq[4] = t+1
return SubRegion(slot, start=fullp, stop=fullq)

class _Source(blockedarray.adapters.SourceABC):
def __init__(self, slot, blockShape, c, t):
super(_Source, self).__init__()
self._slot = slot
self._blockShape = blockShape # is passed to blockedarray!
self._p = np.asarray(slot.meta.shape[:3], dtype=np.long)*0
self._q = np.asarray(slot.meta.shape[:3], dtype=np.long)
self._c = c
self._t = t

def pySetRoi(self, roi):
assert len(roi) == 2
self._p = np.asarray(roi[0], dtype=np.long)
self._q = np.asarray(roi[1], dtype=np.long)

def pyShape(self):
return tuple(self._slot.meta.shape[:3])

def pyReadBlock(self, roi, output):
assert len(roi) == 2
roiP = np.asarray(roi[0])
roiQ = np.asarray(roi[1])
p = self._p + roiP
q = p + roiQ - roiP
if np.any(q > self._q):
raise IndexError("Requested roi is too large for selected "
"roi (previous call to setRoi)")
sub = fullRoiFromPQ(self._slot, p, q, self._c, self._t)
req = self._slot.get(sub)
req.writeInto(output[..., np.newaxis, np.newaxis])
req.block()
return True

class _Sink(blockedarray.adapters.SinkABC):
def __init__(self, op, c, t):
super(_Sink, self).__init__()
self._op = op
self._c = c
self._t = t

def pyWriteBlock(self, roi, block):
assert len(roi) == 2
roiP = np.asarray(roi[0])
roiQ = np.asarray(roi[1])
sub = fullRoiFromPQ(self._op.Input, roiP, roiQ, self._c, self._t)
self._op.setInSlot(self._op.Input, (), sub,
block[..., np.newaxis, np.newaxis])


## find a good block shape for given input shape
# Set the block shape such that it
# (a) divides the volume shape evenly
Expand Down
Loading