diff --git a/lazyflow/operators/opLabelVolume.py b/lazyflow/operators/opLabelVolume.py index 73cc815b2..d38c2b74c 100644 --- a/lazyflow/operators/opLabelVolume.py +++ b/lazyflow/operators/opLabelVolume.py @@ -1,6 +1,8 @@ from threading import Lock as ThreadLock from functools import partial +from abc import ABCMeta, abstractmethod, abstractproperty +import logging import numpy as np import vigra @@ -12,14 +14,7 @@ from lazyflow.request import Request, RequestPool from lazyflow.operators import OpCompressedCache, OpReorderAxes -# try to import the blockedarray module, fail only if neccessary -try: - import blockedarray -except ImportError as e: - _blockedarray_module_available = False - _importMsg = str(e) -else: - _blockedarray_module_available = True +logger = logging.getLogger(__name__) ## OpLabelVolume - the **unified** connected components operator @@ -51,15 +46,13 @@ class OpLabelVolume(Operator): ## Labeled volume # Axistags and shape are the same as on the Input, dtype is an integer # datatype. - # Note: The output might be cached internally depending on the chosen CC - # method, use the CachedOutput to avoid duplicate caches. (see inner - # operator below for details) + # Note: This output is just an alias for CachedOutput, because all + # implementations use internal caches. Output = OutputSlot() ## Cached label image - # Access the internal cache. If you were planning to cache the labeled - # volume, be sure to use this slot, since it makes use of the internal - # cache that might be in use anyway. + # Axistags and shape are the same as on the Input, dtype is an integer + # datatype. CachedOutput = OutputSlot() name = "OpLabelVolume" @@ -105,7 +98,7 @@ def setupOutputs(self): self._op5_2_cached.Input.connect(self._opLabel.CachedOutput) # set the final reordering operator's AxisOrder to that of the input - origOrder = "".join([s for s in self.Input.meta.getTaggedShape()]) + origOrder = self.Input.meta.getAxisKeys() self._op5_2.AxisOrder.setValue(origOrder) self._op5_2_cached.AxisOrder.setValue(origOrder) @@ -145,6 +138,8 @@ def _setBG(self): ## parent class for all connected component labeling implementations class OpLabelingABC(Operator): + __metaclass__ = ABCMeta + ## input with axes 'xyzct' Input = InputSlot() @@ -154,6 +149,11 @@ class OpLabelingABC(Operator): Output = OutputSlot() CachedOutput = OutputSlot() + ## list of supported dtypes + @abstractproperty + def supportedDtypes(self): + pass + def __init__(self, *args, **kwargs): super(OpLabelingABC, self).__init__(*args, **kwargs) self._cache = None @@ -162,6 +162,15 @@ def __init__(self, *args, **kwargs): def setupOutputs(self): labelType = np.uint32 + # check if the input dtype is valid + if self.Input.ready(): + dtype = self.Input.meta.dtype + if dtype not in self.supportedDtypes: + msg = "{}: dtype '{}' not supported "\ + "with method 'vigra'. Supported types: {}" + msg = msg.format(self.name, dtype, self.supportedDtypes) + raise ValueError(msg) + # remove unneeded old cache if self._cache is not None: self._cache.Input.disconnect() @@ -181,11 +190,9 @@ def setupOutputs(self): self.Output.meta.assignFrom(self._cache.Output.meta) self.CachedOutput.meta.assignFrom(self._cache.Output.meta) + # prepare locks for each channel and time slice s = self.Input.meta.getTaggedShape() shape = (s['c'], s['t']) - self._cached = np.zeros(shape) - - # prepare locks for each channel and time slice locks = np.empty(shape, dtype=np.object) for c in range(s['c']): for t in range(s['t']): @@ -204,7 +211,9 @@ def propagateDirty(self, slot, subindex, roi): self.CachedOutput.setDirty(outroi) def execute(self, slot, subindex, roi, result): - #FIXME we don't care right now which slot is requested, just return cached CC + #FIXME we don't care right now which slot is requested, just return + # cached CC (all implementations cache anyways) + # get the background values bg = self.Background[...].wait() bg = vigra.taggedView(bg, axistags=self.Background.meta.axistags) @@ -216,18 +225,28 @@ def execute(self, slot, subindex, roi, result): # do labeling in parallel over channels and time slices pool = RequestPool() + ## function for request ## + def singleSliceRequest(start3d, stop3d, c, t, bg): + # computing CC is a critical section + self._locks[c, t].acquire() + + # update the slice + self._updateCache(start3d, stop3d, c, t, bg) + + # leave the critical section + self._locks[c, t].release() + ## end function for request ## + for t in range(roi.start[4], roi.stop[4]): for c in range(roi.start[3], roi.stop[3]): - # only one thread may update the cache for this c and t, other requests - # must wait until labeling is finished - self._locks[c, t].acquire() - if self._cached[c, t]: - # this slice is already computed - continue # update the whole slice - req = Request(partial(self._updateSlice, c, t, bg[c, t])) + req = Request(partial(singleSliceRequest, + roi.start[:3], roi.stop[:3], + c, t, bg[c, t])) pool.add(req) + logger.debug("{}: Computing connected components for ROI {}".format( + self.name, roi)) pool.wait() pool.clean() @@ -235,39 +254,57 @@ def execute(self, slot, subindex, roi, result): req.writeInto(result) req.block() - # release locks and set caching flags - for t in range(roi.start[4], roi.stop[4]): - for c in range(roi.start[3], roi.stop[3]): - self._cached[c, t] = 1 - self._locks[c, t].release() - - ## compute the requested slice and put the results into self._cache + ## compute the requested roi and put the results into self._cache # + @abstractmethod + def _updateCache(self, start3d, stop3d, c, t, bg): + pass + + +class OpNonLazyCC(OpLabelingABC): + + def setupOutputs(self): + if self.Input.ready(): + s = self.Input.meta.getTaggedShape() + shape = (s['c'], s['t']) + self._cached = np.zeros(shape, dtype=np.bool) + super(OpNonLazyCC, self).setupOutputs() + + ## wraps the childrens' updateSlice function to check if recomputation is + ## needed + def _updateCache(self, start3d, stop3d, c, t, bg): + # we compute the whole slice, regardless of actual roi, if needed + if self._cached[c, t]: + return + self._updateSlice(c, t, bg) + self._cached[c, t] = True + + @abstractmethod def _updateSlice(self, c, t, bg): - raise NotImplementedError("This is an abstract method") + pass ## vigra connected components -class _OpLabelVigra(OpLabelingABC): +class _OpLabelVigra(OpNonLazyCC): name = "OpLabelVigra" + supportedDtypes = [np.uint8, np.uint32, np.float32] - def setupOutputs(self): - if self.Input.ready(): - supportedDtypes = [np.uint8, np.uint32, np.float32] - dtype = self.Input.meta.dtype - if dtype not in supportedDtypes: - msg = "OpLabelVolume: dtype '{}' not supported "\ - "with method 'vigra'. Supported types: {}" - msg = msg.format(dtype, supportedDtypes) - raise ValueError(msg) - super(_OpLabelVigra, self).setupOutputs() + def propagateDirty(self, slot, subindex, roi): + # set the cache to dirty + self._cached[roi.start[3]:roi.stop[3], + roi.start[4]:roi.stop[4]] = 0 + super(_OpLabelVigra, self).propagateDirty(slot, subindex, roi) def _updateSlice(self, c, t, bg): source = vigra.taggedView(self.Input[..., c, t].wait(), axistags='xyzct') source = source.withAxes(*'xyz') - result = vigra.analysis.labelVolumeWithBackground( - source, background_value=int(bg)) + if source.shape[2] > 1: + result = vigra.analysis.labelVolumeWithBackground( + source, background_value=int(bg)) + else: + result = vigra.analysis.labelImageWithBackground( + source[..., 0], background_value=int(bg)) result = result.withAxes(*'xyzct') stop = np.asarray(self.Input.meta.shape) @@ -279,22 +316,37 @@ def _updateSlice(self, c, t, bg): self._cache.setInSlot(self._cache.Input, (), roi, result) -## blockedarray connected components -class _OpLabelBlocked(OpLabelingABC): +# try to import the blockedarray module, fail only if neccessary +try: + from blockedarray import OpBlockedConnectedComponents +except ImportError as e: + _blockedarray_module_available = False + _importMsg = str(e) + class OpBlockedConnectedComponents(object): + pass +else: + _blockedarray_module_available = True + _importMsg = "No error, importing blockedarray worked." + +def haveBlocked(): + return _blockedarray_module_available + + +## Wrapper for blockedarray.OpBlockedConnectedComponents +# This wrapper takes care that the module is indeed imported, and sets the +# block shape for the cache. +class _OpLabelBlocked(OpBlockedConnectedComponents): name = "OpLabelBlocked" def _updateSlice(self, c, t, bg): assert _blockedarray_module_available,\ - "Failed to import blockedarray: {}".format(_importMsg) - assert bg == 0,\ - "Blocked Labeling not implemented for background value {}".format(bg) + "Failed to import blockedarray. Message was: {}".format(_importMsg) - blockShape = _findBlockShape(self.Input.meta.shape[:3]) + blockShape = _findBlockShape(self.Input.meta.shape[:3]) + (1, 1) + logger.debug("{}: Using blockshape {}".format(self.name, blockShape)) + self._cache.BlockShape.setValue(blockShape) + super(_OpLabelBlocked, self)._updateSlice(c, t, bg) - source = _Source(self.Input, blockShape, c, t) - sink = _Sink(self._cache, c, t) - cc = blockedarray.dim3.ConnectedComponents(source, blockShape) - cc.writeToSink(sink) ## Feeds meta data into OpCompressedCache @@ -320,68 +372,6 @@ def setMeta(self, meta): self.Output.meta.assignFrom(meta) -if _blockedarray_module_available: - - def fullRoiFromPQ(slot, p, q, c, t): - fullp = np.zeros((5,), dtype=np.int) - fullq = np.zeros((5,), dtype=np.int) - fullp[:3] = p - fullq[:3] = q - fullp[3] = c - fullp[4] = t - fullq[3] = c+1 - fullq[4] = t+1 - return SubRegion(slot, start=fullp, stop=fullq) - - class _Source(blockedarray.adapters.SourceABC): - def __init__(self, slot, blockShape, c, t): - super(_Source, self).__init__() - self._slot = slot - self._blockShape = blockShape # is passed to blockedarray! - self._p = np.asarray(slot.meta.shape[:3], dtype=np.long)*0 - self._q = np.asarray(slot.meta.shape[:3], dtype=np.long) - self._c = c - self._t = t - - def pySetRoi(self, roi): - assert len(roi) == 2 - self._p = np.asarray(roi[0], dtype=np.long) - self._q = np.asarray(roi[1], dtype=np.long) - - def pyShape(self): - return tuple(self._slot.meta.shape[:3]) - - def pyReadBlock(self, roi, output): - assert len(roi) == 2 - roiP = np.asarray(roi[0]) - roiQ = np.asarray(roi[1]) - p = self._p + roiP - q = p + roiQ - roiP - if np.any(q > self._q): - raise IndexError("Requested roi is too large for selected " - "roi (previous call to setRoi)") - sub = fullRoiFromPQ(self._slot, p, q, self._c, self._t) - req = self._slot.get(sub) - req.writeInto(output[..., np.newaxis, np.newaxis]) - req.block() - return True - - class _Sink(blockedarray.adapters.SinkABC): - def __init__(self, op, c, t): - super(_Sink, self).__init__() - self._op = op - self._c = c - self._t = t - - def pyWriteBlock(self, roi, block): - assert len(roi) == 2 - roiP = np.asarray(roi[0]) - roiQ = np.asarray(roi[1]) - sub = fullRoiFromPQ(self._op.Input, roiP, roiQ, self._c, self._t) - self._op.setInSlot(self._op.Input, (), sub, - block[..., np.newaxis, np.newaxis]) - - ## find a good block shape for given input shape # Set the block shape such that it # (a) divides the volume shape evenly diff --git a/tests/testOpLabelVolume.py b/tests/testOpLabelVolume.py index 161f5f160..58097aba5 100644 --- a/tests/testOpLabelVolume.py +++ b/tests/testOpLabelVolume.py @@ -11,12 +11,7 @@ from numpy.testing import assert_array_equal, assert_array_almost_equal -try: - import blockedarray -except ImportError: - have_blocked = False -else: - have_blocked = True +from lazyflow.operators.opLabelVolume import haveBlocked class TestVigra(unittest.TestCase): @@ -81,6 +76,33 @@ def testMultiDim(self): print("t={}, c={}".format(t, c)) assertEquivalentLabeling(blocks[..., c, t], out[..., c, t]) + def testSingletonZ(self): + vol = np.zeros((82, 70, 1, 5, 5), dtype=np.uint8) + vol = vigra.taggedView(vol, axistags='xyzct') + + blocks = np.zeros(vol.shape, dtype=np.uint8) + blocks[30:50, 40:60, :, 2:4, 3:5] = 1 + blocks[30:50, 40:60, :, 2:4, 0:2] = 2 + blocks[60:70, 30:40, :, :, :] = 3 + + vol[blocks == 1] = 255 + vol[blocks == 2] = 255 + vol[blocks == 3] = 255 + + op = OpLabelVolume(graph=Graph()) + op.Method.setValue(self.method) + op.Input.setValue(vol) + + out = op.Output[...].wait() + tags = op.Output.meta.getTaggedShape() + print(tags) + out = vigra.taggedView(out, axistags="".join([s for s in tags])) + + for c in range(out.shape[3]): + for t in range(out.shape[4]): + print("t={}, c={}".format(t, c)) + assertEquivalentLabeling(blocks[..., c, t], out[..., c, t]) + def testConsistency(self): vol = np.zeros((1000, 100, 10)) vol = vol.astype(np.uint8) @@ -192,19 +214,20 @@ def testBackground(self): assertEquivalentLabeling(vol[..., c, t], out.squeeze()) -@unittest.skipIf(not have_blocked, "Cannot test blockedarray because you don't have the module") +@unittest.skipIf(not haveBlocked(), "Cannot test blockedarray because you don't have the module") class TestBlocked(TestVigra): def setUp(self): self.method = np.asarray(['blocked'], dtype=np.object) - @unittest.skip("Not implemented yet") - def testUnsupported(self): - pass + #@unittest.skip("Not implemented yet") + #def testUnsupported(self): + #pass - @unittest.skip("Not implemented yet") + # background value is unsupported for blocked labeling + @unittest.expectedFailure def testBackground(self): - pass + super(TestBlocked, self).testBackground() def assertEquivalentLabeling(x, y):