Channel With Infinite Buffer in Golang

In Golang we have buffered and unbuffered channels. Buffered channels can hold an x amount of values “in transit” before the channel is blocked from writing. It is necessary to read a value from the other end first, to “make room”, before more can be put in. That amount x has to be specified and is limited.

Sometimes one wants the readers and the writers of the cannel to be completely independent from each other, so it is always possible to write, regardless of the speed of reading. For that to happen with a buffered channel, x would have to be very, very large. This is undesirabele.

Jon Bodner described a much better solution in this article on Medium, that uses two channels with a slice in between. There are some tricky details, so go read it before you copy and paste the final result shown below. I save it here to make sure it stays up and that I can find it again in a few years from now. Also, the text below is actual text that you can copy and paste.

One does not need to use an empty interface like he did, of course. In general channels have a type as you know what will flow through it beforehand. Here we will pretend that the channel is of type *Thing and that Thing has an id of type string that we can get by calling Thing.Id(). Because that happens to be the usecase I have in front of me right now.

func MakeUnboundedQueue() (chan<- *Thing, <-chan *Thing) {
  in := make(chan *Thing)
  out := make(chan *Thing)

  go func() {

    var inQueue []*Thing

    outCh := func() chan *Thing {
      if len(inQueue) == 0 {
        return nil
      }

      return out
    }

    cur := func() *Thing {
      if len(inQueue) == 0 {
        return nil
      }

      return inQueue[0]
    }

    for len(inQueue) > 0 || in != nil {
      select {
      case oc, ok := <-in:
        if !ok {
          in = nil
        } else {
          inQueue = append(inQueue, oc)
        }
      case outCh() <- cur():
        if out != nil {
        inQueue = inQueue[1:]
        }
      }
    }

    close(out)
  }()

  return in, out
}

And the corresponding tests:


func TestMakeUnboundedQueue(t *testing.T) {
  count := 10
  idFormat := "id-%d"

  for _, tc := range []struct {
    name       string
    writeDelay bool
    readDelay  bool
  }{
    {
      name: "no delay",
    },
    {
      name:       "slow write",
      writeDelay: true,
    },
    {
      name:      "slow read",
      readDelay: true,
    },
    {
      name:       "slow read and write",
      writeDelay: true,
      readDelay:  true,
    },
  } {
    t.Run(tc.name, func(t *testing.T) {
      in, out := MakeUnboundedQueue()
      var lastIntId int
      lastId := fmt.Sprintf(idFormat, lastIntId)

      var wg sync.WaitGroup
      wg.Add(1)

      go func() {
        for o := range out {
          if tc.readDelay {
            time.Sleep(50 * time.Millisecond)
          }
          lastIntId += 1
          lastId = fmt.Sprintf(idFormat, lastIntId)
          test.Equals(t, lastId, o.Id())
        }
        wg.Done()
      }()

      for i := 1; i <= count; i++ {
        if tc.writeDelay {
          time.Sleep(50 * time.Millisecond)
        }
        in <- NewThing(fmt.Sprintf(idFormat, i))
      }
      close(in)
      wg.Wait()

      test.Equals(t, fmt.Sprintf(idFormat, count), lastId)
    })
  }
}

Source