Behaviour of parDistributeScan
There are currently 3 issues with parDistributeScan
.
sendToWorker_
errors out when there is no spacce in the input buffer instead of blocking- Non-terminating scans + Finite streams results in a deadlock
- Terminating scans with limited input buffer results in a deadlock
sendToWorker_
{-# INLINE sendToWorker_ #-} sendToWorker_ :: MonadAsync m => Channel m a b -> a -> m () sendToWorker_ chan a = go where -- Recursive function, should we use SPEC? go = do r <- isBufferAvailable chan if r then do liftIO $ void $ sendEvent (inputQueue chan) (inputItemDoorBell chan) (ChildYield a) else do error "sendToWorker_: No space available in the buffer" -- Block for space -- () <- liftIO $ takeMVar (inputSpaceDoorBell chan) -- go
We deliberately error out here.
Solution
I don't see a reason why we should not block on input. There is a possibility that the error case was used for testing. We should uncomment the blocking code and remove the error part.
Non-terminating Scans + Finite Streams
The loop of parDistributeScan
works in the following way once the stream has
terminated:
- Collect the outputs
- Check for outputs from the list of scan channels:
[Channel]
. - Remove any terminated scans from the list of scan channels.
- Return the outputs.
- Check for outputs from the list of scan channels:
- Check if we have running scans
When we have scans that run forever we always have running scans. Once the stream is terminated, all the scanning will have terminated and there will be no output. Since there will be no output the output doorbell is never rung so we are blocked indefinitely on the output door bell.
Solution
We already communicate with the the input stream had ended by sending a
ChildStopEvent
.
Stop -> do Prelude.mapM_ finalize (fmap fst running) return $ ScanDrain q db running
finalize
sends a ChildStopEvent
to all the scans channels. Although we end
the stream after reciving a ChildStopEvent
, we don't end the Scan.
The most straightworfard solution is to send raw events to the scan and pass the
control flow to the scan itself. We can end the scan once the manager scan
receives a ChildStopEvent
.
Terminating Scans + Limited Input Buffer
The following loop sets the context:
- Collect the outputs
- Fetch the next input from the input stream
- Send the input to all the workers using
sendToWorker_
- Yield the outputs and continue.
sendToWorker_
blocks when there is no space in the input buffer. This is
generally not a problem as the input buffer is consumed most of the time. But if
the scan in question is terminated after collecting the outputs then the input
buffer is never consumed and sendToWorker_
blocks indefinitely.
Solution
We can choose to make the manager scan a non-terminating scan by default. And
only terminate it on receiving a ChildStopEvent
. The caller is responsible for
sending a ChildStopEvent
to the manager fold to cleaning it up.
This way, the extra input is always drained before the caller decides to not send any more input.