From 045b14ba2111f13ed1da7c62ec722e387d138284 Mon Sep 17 00:00:00 2001 From: Hajime Hoshi Date: Wed, 21 Jul 2021 22:44:43 +0900 Subject: [PATCH] audio/internal/readerdriver: Reimplement Windows audio with the common players Updates #1710 Updates #1768 --- audio/internal/readerdriver/driver_windows.go | 591 +++--------------- .../{player_unix.go => player_notjs.go} | 4 +- audio/internal/readerdriver/winmm_windows.go | 4 +- 3 files changed, 91 insertions(+), 508 deletions(-) rename audio/internal/readerdriver/{player_unix.go => player_notjs.go} (99%) diff --git a/audio/internal/readerdriver/driver_windows.go b/audio/internal/readerdriver/driver_windows.go index a08b4d64d..a748545de 100644 --- a/audio/internal/readerdriver/driver_windows.go +++ b/audio/internal/readerdriver/driver_windows.go @@ -15,18 +15,14 @@ package readerdriver import ( - "io" - "runtime" + "fmt" "sync" "unsafe" "golang.org/x/sys/windows" ) -// The common players in players_unix.go are not used on Windows. -// Mixing on Go side can cause bigger delays (#1710). - -const headerBufferSize = 2048 +const headerBufferSize = 4096 func IsAvailable() bool { return true @@ -34,18 +30,18 @@ func IsAvailable() bool { type header struct { waveOut uintptr - buffer []byte + buffer []float32 waveHdr *wavehdr } -func newHeader(waveOut uintptr, bufferSize int) (*header, error) { +func newHeader(waveOut uintptr, bufferSizeInBytes int) (*header, error) { h := &header{ waveOut: waveOut, - buffer: make([]byte, bufferSize), + buffer: make([]float32, bufferSizeInBytes/4), } h.waveHdr = &wavehdr{ lpData: uintptr(unsafe.Pointer(&h.buffer[0])), - dwBufferLength: uint32(bufferSize), + dwBufferLength: uint32(bufferSizeInBytes), } if err := waveOutPrepareHeader(waveOut, h.waveHdr); err != nil { return nil, err @@ -53,10 +49,7 @@ func newHeader(waveOut uintptr, bufferSize int) (*header, error) { return h, nil } -func (h *header) Write(data []byte) error { - if n := len(h.buffer) - len(data); n > 0 { - data = append(data, make([]byte, n)...) - } +func (h *header) Write(data []float32) error { copy(h.buffer, data) if err := waveOutWrite(h.waveOut, h.waveHdr); err != nil { return err @@ -76,8 +69,17 @@ type context struct { sampleRate int channelNum int bitDepthInBytes int + + waveOut uintptr + headers []*header + + cond *sync.Cond + + players *players } +var theContext *context + func NewContext(sampleRate, channelNum, bitDepthInBytes int) (Context, chan struct{}, error) { ready := make(chan struct{}) close(ready) @@ -86,420 +88,72 @@ func NewContext(sampleRate, channelNum, bitDepthInBytes int) (Context, chan stru sampleRate: sampleRate, channelNum: channelNum, bitDepthInBytes: bitDepthInBytes, + cond: sync.NewCond(&sync.Mutex{}), + players: newPlayers(), } + theContext = c + + const bitsPerSample = 32 + nBlockAlign := c.channelNum * bitsPerSample / 8 + f := &waveformatex{ + wFormatTag: waveFormatIEEEFloat, + nChannels: uint16(c.channelNum), + nSamplesPerSec: uint32(c.sampleRate), + nAvgBytesPerSec: uint32(c.sampleRate * nBlockAlign), + wBitsPerSample: bitsPerSample, + nBlockAlign: uint16(nBlockAlign), + } + + // TOOD: What about using an event instead of a callback? PortAudio and other libraries do that. + w, err := waveOutOpen(f, waveOutOpenCallback) + const elementNotFound = 1168 + if e, ok := err.(*winmmError); ok && e.errno == elementNotFound { + // TODO: No device was found. Return the dummy device (hajimehoshi/oto#77). + // TODO: Retry to open the device when possible. + return nil, nil, err + } + if err != nil { + return nil, nil, err + } + + c.waveOut = w + c.headers = make([]*header, 0, 4) + for len(c.headers) < cap(c.headers) { + h, err := newHeader(c.waveOut, headerBufferSize) + if err != nil { + return nil, nil, err + } + c.headers = append(c.headers, h) + } + + go c.loop() + return c, ready, nil } func (c *context) Suspend() error { - return thePlayers.suspend() + if err := waveOutPause(c.waveOut); err != nil { + return err + } + return nil } func (c *context) Resume() error { - return thePlayers.resume() -} + // TODO: Ensure at least one header is queued? -type players struct { - players map[uintptr]*playerImpl - toResume map[*playerImpl]struct{} - cond *sync.Cond -} - -func (p *players) add(player *playerImpl, waveOut uintptr) { - p.cond.L.Lock() - defer p.cond.L.Unlock() - - if p.players == nil { - p.players = map[uintptr]*playerImpl{} - } - runLoop := len(p.players) == 0 - p.players[waveOut] = player - if runLoop { - // Use the only one loop. Windows' context switching is not efficent and - // using too many goroutines might be problematic. - go p.loop() - } -} - -func (p *players) remove(waveOut uintptr) { - p.cond.L.Lock() - defer p.cond.L.Unlock() - - pl, ok := p.players[waveOut] - if !ok { - return - } - delete(p.players, waveOut) - delete(p.toResume, pl) - - p.cond.Signal() -} - -func (p *players) suspend() error { - p.cond.L.Lock() - defer p.cond.L.Unlock() - - for _, pl := range p.players { - if !pl.IsPlaying() { - continue - } - pl.Pause() - if p.toResume == nil { - p.toResume = map[*playerImpl]struct{}{} - } - p.toResume[pl] = struct{}{} + if err := waveOutRestart(c.waveOut); err != nil { + return err } return nil } -func (p *players) resume() error { - p.cond.L.Lock() - defer p.cond.L.Unlock() - - for pl := range p.toResume { - pl.Play() - delete(p.toResume, pl) - } - return nil -} - -func (p *players) shouldWait() bool { - if len(p.players) == 0 { - return false - } - - for _, pl := range p.players { - if pl.canProceed() { - return false +func (c *context) isHeaderAvailable() bool { + for _, h := range c.headers { + if !h.IsQueued() { + return true } } - return true -} - -func (p *players) wait() bool { - p.cond.L.Lock() - defer p.cond.L.Unlock() - - for p.shouldWait() { - p.cond.Wait() - } - return len(p.players) > 0 -} - -func (p *players) loop() { - for { - if !p.wait() { - return - } - p.cond.L.Lock() - for _, pl := range p.players { - pl.readAndWriteBuffer() - } - p.cond.L.Unlock() - } -} - -var thePlayers = players{ - cond: sync.NewCond(&sync.Mutex{}), -} - -type player struct { - p *playerImpl -} - -type playerImpl struct { - context *context - src io.Reader - err error - waveOut uintptr - state playerState - headers []*header - buf []byte - eof bool - volume float64 - - m sync.Mutex -} - -func (c *context) NewPlayer(src io.Reader) Player { - p := &player{ - p: &playerImpl{ - context: c, - src: src, - volume: 1, - }, - } - runtime.SetFinalizer(p, (*player).Close) - return p -} - -func (p *player) Err() error { - return p.p.Err() -} - -func (p *playerImpl) Err() error { - p.m.Lock() - defer p.m.Unlock() - return p.err -} - -func (p *player) Play() { - p.p.Play() -} - -func (p *playerImpl) Play() { - // Call Play asynchronously since playImpl might take long. - ch := make(chan struct{}) - go func() { - p.m.Lock() - defer p.m.Unlock() - close(ch) - p.playImpl() - }() - - // Wait until the mutex is locked in the above goroutine. - <-ch -} - -func (p *playerImpl) playImpl() { - if p.err != nil { - return - } - if p.state != playerPaused { - return - } - - if p.waveOut == 0 { - numBlockAlign := p.context.channelNum * p.context.bitDepthInBytes - f := &waveformatex{ - wFormatTag: waveFormatPCM, - nChannels: uint16(p.context.channelNum), - nSamplesPerSec: uint32(p.context.sampleRate), - nAvgBytesPerSec: uint32(p.context.sampleRate * numBlockAlign), - wBitsPerSample: uint16(p.context.bitDepthInBytes * 8), - nBlockAlign: uint16(numBlockAlign), - } - - // TOOD: What about using an event instead of a callback? PortAudio and other libraries do that. - w, err := waveOutOpen(f, waveOutOpenCallback) - const elementNotFound = 1168 - if e, ok := err.(*winmmError); ok && e.errno == elementNotFound { - // TODO: No device was found. Return the dummy device (hajimehoshi/oto#77). - // TODO: Retry to open the device when possible. - p.setErrorImpl(err) - return - } - if err != nil { - p.setErrorImpl(err) - return - } - - p.waveOut = w - p.headers = make([]*header, 0, 6) - for len(p.headers) < cap(p.headers) { - h, err := newHeader(p.waveOut, headerBufferSize) - if err != nil { - p.setErrorImpl(err) - return - } - p.headers = append(p.headers, h) - } - - thePlayers.add(p, p.waveOut) - } - - if p.eof && len(p.buf) == 0 { - return - } - - // Set the state first as readAndWriteBufferImpl checks the current player state. - p.state = playerPlay - - // Call readAndWriteBufferImpl to ensure at least one header is queued. - p.readAndWriteBufferImpl() - - if err := waveOutRestart(p.waveOut); err != nil { - p.setErrorImpl(err) - return - } - - // Switching goroutines is very inefficient on Windows. Avoid a dedicated goroutine for a player. -} - -func (p *playerImpl) queuedHeadersNum() int { - var c int - for _, h := range p.headers { - if h.IsQueued() { - c++ - } - } - return c -} - -func (p *playerImpl) canProceed() bool { - p.m.Lock() - defer p.m.Unlock() - return p.queuedHeadersNum() < len(p.headers) && p.state == playerPlay -} - -func (p *player) Pause() { - p.p.Pause() -} - -func (p *playerImpl) Pause() { - p.m.Lock() - defer p.m.Unlock() - p.pauseImpl() -} - -func (p *playerImpl) pauseImpl() { - if p.err != nil { - return - } - if p.state != playerPlay { - return - } - if p.waveOut == 0 { - return - } - - // waveOutPause never return when there is no queued header. - if p.queuedHeadersNum() > 0 { - if err := waveOutPause(p.waveOut); err != nil { - p.setErrorImpl(err) - return - } - } - - p.state = playerPaused -} - -func (p *player) Reset() { - p.p.Reset() -} - -func (p *playerImpl) Reset() { - p.m.Lock() - defer p.m.Unlock() - p.resetImpl() -} - -func (p *playerImpl) resetImpl() { - if p.err != nil { - return - } - if p.state == playerClosed { - return - } - if p.waveOut == 0 { - return - } - - // waveOutReset and waveOutPause never return when there is no queued header. - if p.queuedHeadersNum() > 0 { - err := waveOutReset(p.waveOut) - if err != nil { - p.setErrorImpl(err) - return - } - - err = waveOutPause(p.waveOut) - if err != nil { - p.setErrorImpl(err) - return - } - } - - // Now all the headers are WHDR_DONE. Recreate the headers. - for i, h := range p.headers { - if err := h.Close(); err != nil { - p.setErrorImpl(err) - return - } - h, err := newHeader(p.waveOut, headerBufferSize) - if err != nil { - p.setErrorImpl(err) - return - } - p.headers[i] = h - } - - p.state = playerPaused - p.buf = p.buf[:0] - p.eof = false -} - -func (p *player) IsPlaying() bool { - return p.p.IsPlaying() -} - -func (p *playerImpl) IsPlaying() bool { - p.m.Lock() - defer p.m.Unlock() - return p.state == playerPlay -} - -func (p *player) Volume() float64 { - return p.p.Volume() -} - -func (p *playerImpl) Volume() float64 { - p.m.Lock() - defer p.m.Unlock() - return p.volume -} - -func (p *player) SetVolume(volume float64) { - p.p.SetVolume(volume) -} - -func (p *playerImpl) SetVolume(volume float64) { - p.m.Lock() - defer p.m.Unlock() - p.volume = volume -} - -func (p *player) UnplayedBufferSize() int { - return p.p.UnplayedBufferSize() -} - -func (p *playerImpl) UnplayedBufferSize() int { - p.m.Lock() - defer p.m.Unlock() - return len(p.buf) -} - -func (p *player) Close() error { - runtime.SetFinalizer(p, nil) - return p.p.Close() -} - -func (p *playerImpl) Close() error { - p.m.Lock() - defer p.m.Unlock() - return p.closeImpl() -} - -func (p *playerImpl) closeImpl() error { - p.state = playerClosed - if p.waveOut != 0 { - for _, h := range p.headers { - if err := h.Close(); err != nil && p.err == nil { - p.err = err - } - } - p.headers = p.headers[:0] - if err := waveOutClose(p.waveOut); err != nil && p.err == nil { - p.err = err - } - - // This player's lock might block thePlayer's lock. Unlock this first. - p.m.Unlock() - thePlayers.remove(p.waveOut) - p.m.Lock() - - p.waveOut = 0 - } - return p.err + return false } var waveOutOpenCallback = windows.NewCallbackCDecl(func(hwo, uMsg, dwInstance, dwParam1, dwParam2 uintptr) uintptr { @@ -507,92 +161,36 @@ var waveOutOpenCallback = windows.NewCallbackCDecl(func(hwo, uMsg, dwInstance, d if uMsg != womDone { return 0 } - thePlayers.cond.Signal() + theContext.cond.Signal() return 0 }) -func (p *playerImpl) readAndWriteBuffer() { - p.m.Lock() - defer p.m.Unlock() - p.readAndWriteBufferImpl() +func (c *context) loop() { + buf32 := make([]float32, headerBufferSize/4) + for { + c.appendBuffer(buf32) + } } -func (p *playerImpl) readAndWriteBufferImpl() { - if p.state != playerPlay { - return +func (c *context) appendBuffer(buf32 []float32) { + c.cond.L.Lock() + defer c.cond.L.Unlock() + + for !c.isHeaderAvailable() { + c.cond.Wait() } - for len(p.buf) < p.context.maxBufferSize() && !p.eof { - buf := make([]byte, p.context.maxBufferSize()) - n, err := p.src.Read(buf) - if err != nil && err != io.EOF { - p.setErrorImpl(err) - return - } - p.buf = append(p.buf, buf[:n]...) - if err == io.EOF { - if len(p.buf) == 0 { - p.eof = true - } - break - } + for i := range buf32 { + buf32[i] = 0 } + c.players.read(buf32) - for _, h := range p.headers { - if len(p.buf) == 0 { - break - } + for _, h := range c.headers { if h.IsQueued() { continue } - n := headerBufferSize - if n > len(p.buf) { - n = len(p.buf) - } - buf := p.buf[:n] - - // Adjust the volume - if p.volume < 1 { - switch p.context.bitDepthInBytes { - case 1: - const ( - max = 127 - min = -128 - offset = 128 - ) - for i, b := range buf { - x := int16(b) - offset - x = int16(float64(x) * p.volume) - if x > max { - x = max - } - if x < min { - x = min - } - buf[i] = byte(x + offset) - } - case 2: - const ( - max = (1 << 15) - 1 - min = -(1 << 15) - ) - for i := 0; i < n/2; i++ { - x := int32(int16(buf[2*i]) | (int16(buf[2*i+1]) << 8)) - x = int32(float64(x) * p.volume) - if x > max { - x = max - } - if x < min { - x = min - } - buf[2*i] = byte(x) - buf[2*i+1] = byte(x >> 8) - } - } - } - - if err := h.Write(buf); err != nil { + if err := h.Write(buf32); err != nil { // This error can happen when e.g. a new HDMI connection is detected (hajimehoshi/oto#51). const errorNotFound = 1168 if werr := err.(*winmmError); werr.fname == "waveOutWrite" { @@ -603,24 +201,9 @@ func (p *playerImpl) readAndWriteBufferImpl() { // TODO: Retry later. } } - p.setErrorImpl(err) - return + // TODO: Treat the error corretly + panic(fmt.Errorf("readerdriver: Queueing the header failed: %v", err)) } - - p.buf = p.buf[n:] - - // 4 is an arbitrary number that doesn't cause a problem at examples/piano (#1653). - if p.queuedHeadersNum() >= 4 { - break - } - } - - if p.queuedHeadersNum() == 0 && p.eof { - p.pauseImpl() + return } } - -func (p *playerImpl) setErrorImpl(err error) { - p.err = err - p.closeImpl() -} diff --git a/audio/internal/readerdriver/player_unix.go b/audio/internal/readerdriver/player_notjs.go similarity index 99% rename from audio/internal/readerdriver/player_unix.go rename to audio/internal/readerdriver/player_notjs.go index e76aeba8a..6a0ba3110 100644 --- a/audio/internal/readerdriver/player_unix.go +++ b/audio/internal/readerdriver/player_notjs.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build !js && !windows -// +build !js,!windows +//go:build !js +// +build !js package readerdriver diff --git a/audio/internal/readerdriver/winmm_windows.go b/audio/internal/readerdriver/winmm_windows.go index d8d97eaa5..9743ffd83 100644 --- a/audio/internal/readerdriver/winmm_windows.go +++ b/audio/internal/readerdriver/winmm_windows.go @@ -59,8 +59,8 @@ type waveformatex struct { } const ( - waveFormatPCM = 1 - whdrInqueue = 16 + waveFormatIEEEFloat = 3 + whdrInqueue = 16 ) type mmresult uint