Behaviour of parDistributeScan

There are currently 3 issues with parDistributeScan.

  1. sendToWorker_ errors out when there is no spacce in the input buffer instead of blocking
  2. Non-terminating scans + Finite streams results in a deadlock
  3. Terminating scans with limited input buffer results in a deadlock

sendToWorker_

{-# INLINE sendToWorker_ #-}
sendToWorker_ :: MonadAsync m => Channel m a b -> a -> m ()
sendToWorker_ chan a = go

    where

    -- Recursive function, should we use SPEC?
    go = do
        r <- isBufferAvailable chan
        if r
        then do
            liftIO
                $ void
                $ sendEvent
                    (inputQueue chan)
                    (inputItemDoorBell chan)
                    (ChildYield a)
        else do
            error "sendToWorker_: No space available in the buffer"
            -- Block for space
            -- () <- liftIO $ takeMVar (inputSpaceDoorBell chan)
            -- go

We deliberately error out here.

Solution

I don't see a reason why we should not block on input. There is a possibility that the error case was used for testing. We should uncomment the blocking code and remove the error part.

Non-terminating Scans + Finite Streams

The loop of parDistributeScan works in the following way once the stream has terminated:

  • Collect the outputs
    • Check for outputs from the list of scan channels: [Channel].
    • Remove any terminated scans from the list of scan channels.
    • Return the outputs.
  • Check if we have running scans
    • If we have running scans, check for outputs
      • If we have outputs yield them
      • If we don't have outputs, and then yield the outputs
    • If there are no running scans, yield the outputs and end the loop

When we have scans that run forever we always have running scans. Once the stream is terminated, all the scanning will have terminated and there will be no output. Since there will be no output the output doorbell is never rung so we are blocked indefinitely on the output door bell.

Solution

We already communicate with the the input stream had ended by sending a ChildStopEvent.

Stop -> do
    Prelude.mapM_ finalize (fmap fst running)
    return $ ScanDrain q db running

finalize sends a ChildStopEvent to all the scans channels. Although we end the stream after reciving a ChildStopEvent, we don't end the Scan.

The most straightworfard solution is to send raw events to the scan and pass the control flow to the scan itself. We can end the scan once the manager scan receives a ChildStopEvent.

Terminating Scans + Limited Input Buffer

The following loop sets the context:

  • Collect the outputs
  • Fetch the next input from the input stream
  • Send the input to all the workers using ​sendToWorker_
  • Yield the outputs and continue.

sendToWorker_ blocks when there is no space in the input buffer. This is generally not a problem as the input buffer is consumed most of the time. But if the scan in question is terminated after collecting the outputs then the input buffer is never consumed and sendToWorker_ blocks indefinitely.

Solution

We can choose to make the manager scan a non-terminating scan by default. And only terminate it on receiving a ChildStopEvent. The caller is responsible for sending a ChildStopEvent to the manager fold to cleaning it up.

This way, the extra input is always drained before the caller decides to not send any more input.

Update