From c112c31e7e5919389f86f27d4e63931ab2723228 Mon Sep 17 00:00:00 2001 From: Hajime Hoshi Date: Mon, 29 Apr 2019 02:27:12 +0900 Subject: [PATCH] audio: Reimplement audio by using multiple oto.Player --- audio/audio.go | 358 ++++++++++++++----------------------- audio/audio_test.go | 3 + audio/mux.go | 145 --------------- audio/testinternal_test.go | 6 +- go.mod | 2 +- go.sum | 4 +- 6 files changed, 139 insertions(+), 379 deletions(-) delete mode 100644 audio/mux.go diff --git a/audio/audio.go b/audio/audio.go index cc4a6e7e8..b152006d5 100644 --- a/audio/audio.go +++ b/audio/audio.go @@ -33,13 +33,17 @@ package audio import ( "bytes" + "errors" "fmt" "io" "runtime" "sync" "time" +) - "github.com/hajimehoshi/ebiten/internal/web" +const ( + channelNum = 2 + bytesPerSample = 2 * channelNum ) // A Context represents a current state of audio. @@ -51,13 +55,14 @@ import ( type Context struct { c context - mux *mux sampleRate int err error inited bool suspended bool ready bool + players map[*playerImpl]struct{} + m sync.Mutex } @@ -88,9 +93,9 @@ func NewContext(sampleRate int) (*Context, error) { c := &Context{ sampleRate: sampleRate, c: newContext(sampleRate), + players: map[*playerImpl]struct{}{}, } theContext = c - c.mux = newMux() h := getHook() h.OnSuspendAudio(func() { @@ -105,6 +110,7 @@ func NewContext(sampleRate int) (*Context, error) { }) h.AppendHookOnBeforeUpdate(func() error { + // On Android, audio loop cannot be started unless JVM is accessible. After updating one frame, JVM should exist. c.m.Lock() c.inited = true c.m.Unlock() @@ -120,8 +126,6 @@ func NewContext(sampleRate int) (*Context, error) { return err }) - go c.loop() - return c, nil } @@ -141,6 +145,13 @@ func (c *Context) playable() bool { return i && !s } +func (c *Context) hasError() bool { + c.m.Lock() + r := c.err != nil + c.m.Unlock() + return r +} + func (c *Context) setError(err error) { // TODO: What if c.err already exists? c.m.Lock() @@ -154,25 +165,28 @@ func (c *Context) setReady() { c.m.Unlock() } -func (c *Context) loop() { - defer c.c.Close() +func (c *Context) addPlayer(p *playerImpl) { + c.m.Lock() + defer c.m.Unlock() + c.players[p] = struct{}{} - p := c.c.NewPlayer() - defer p.Close() - - for { - if !c.playable() { - runtime.Gosched() - continue - } - if _, err := io.CopyN(p, c.mux, 2048); err != nil { - c.setError(err) + // Check the source duplication + srcs := map[io.ReadCloser]struct{}{} + for p := range c.players { + if _, ok := srcs[p.src]; ok { + c.err = errors.New("audio: a same source is used by multiple Player") return } - c.setReady() + srcs[p.src] = struct{}{} } } +func (c *Context) removePlayer(p *playerImpl) { + c.m.Lock() + delete(c.players, p) + c.m.Unlock() +} + // IsReady returns a boolean value indicating whether the audio is ready or not. // // On some browsers, user interaction like click or pressing keys is required to start audio. @@ -236,9 +250,8 @@ type Player struct { } type playerImpl struct { - mux *mux + context *Context src io.ReadCloser - srcEOF bool sampleRate int playing bool closedExplicitly bool @@ -248,22 +261,9 @@ type playerImpl struct { pos int64 volume float64 - closeCh chan struct{} - closedCh chan struct{} - seekCh chan struct{} - seekedCh chan struct{} - proceedCh chan []int16 - proceededCh chan proceededValues - syncCh chan func() - m sync.Mutex } -type proceededValues struct { - buf []int16 - err error -} - // NewPlayer creates a new player with the given stream. // // src's format must be linear PCM (16bits little endian, 2 channel stereo) @@ -282,17 +282,10 @@ type proceededValues struct { func NewPlayer(context *Context, src io.ReadCloser) (*Player, error) { p := &Player{ &playerImpl{ - mux: context.mux, - src: src, - sampleRate: context.sampleRate, - buf: nil, - volume: 1, - closeCh: make(chan struct{}), - closedCh: make(chan struct{}), - seekCh: make(chan struct{}), - seekedCh: make(chan struct{}), - proceedCh: make(chan []int16), - proceededCh: make(chan proceededValues), + context: context, + src: src, + sampleRate: context.sampleRate, + volume: 1, }, } if seeker, ok := p.p.src.(io.Seeker); ok { @@ -348,62 +341,27 @@ func (p *Player) Close() error { func (p *playerImpl) Close() error { p.m.Lock() + defer p.m.Unlock() + p.playing = false - p.mux.removePlayer(p) if p.closedExplicitly { - p.m.Unlock() return fmt.Errorf("audio: the player is already closed") } p.closedExplicitly = true // src.Close is called only when Player's Close is called. if err := p.src.Close(); err != nil { - p.m.Unlock() return err } - p.m.Unlock() - return p.closeImpl() -} -func (p *playerImpl) ensureReadLoop() error { - p.m.Lock() - defer p.m.Unlock() - if p.closedExplicitly { - return fmt.Errorf("audio: the player is already closed") - } - if p.runningReadLoop { + if !p.runningReadLoop { return nil } - // Set runningReadLoop true here, not in the loop, or this causes deadlock with channels in Seek. - // While the for loop doesn't start, Seeks tries to send something to the channels. - // The for loop must start without any locking. - p.runningReadLoop = true - go p.readLoop() + // p.runningReadLoop is set to false in the loop. + return nil } -func (p *playerImpl) closeImpl() error { - p.m.Lock() - r := p.runningReadLoop - p.m.Unlock() - if !r { - return nil - } - - p.closeCh <- struct{}{} - <-p.closedCh - return nil -} - -func (p *playerImpl) bufferToInt16(lengthInBytes int) ([]int16, error) { - if err := p.ensureReadLoop(); err != nil { - return nil, err - } - p.proceedCh <- make([]int16, lengthInBytes/2) - r := <-p.proceededCh - return r.buf, r.err -} - // Play plays the stream. // // Play always returns nil. @@ -414,163 +372,115 @@ func (p *Player) Play() error { func (p *playerImpl) Play() { p.m.Lock() + defer p.m.Unlock() + + if p.closedExplicitly { + p.context.setError(fmt.Errorf("audio: the player is already closed")) + return + } + p.playing = true - p.mux.addPlayer(p) - p.m.Unlock() + if p.runningReadLoop { + return + } + + // Set p.runningReadLoop to true here, not in the loop. This prevents duplicated active loops. + p.runningReadLoop = true + p.context.addPlayer(p) + + go p.loop() + return } -func (p *playerImpl) readLoop() { +func (p *playerImpl) loop() { + w := p.context.c.NewPlayer() + wclosed := make(chan struct{}) + defer func() { + <-wclosed + w.Close() + }() + defer func() { p.m.Lock() p.playing = false + p.context.removePlayer(p) p.runningReadLoop = false p.m.Unlock() }() - timer := time.NewTimer(0) - timerCh := timer.C - var readErr error - for { - select { - case <-p.closeCh: - p.closedCh <- struct{}{} - return + ch := make(chan []byte) + defer close(ch) - case <-p.seekCh: - p.seekedCh <- struct{}{} - if timer != nil { - timer.Stop() - } - timer = time.NewTimer(time.Millisecond) - timerCh = timer.C - - case <-timerCh: - // If the buffer has 1 second, that's enough. - p.m.Lock() - if len(p.buf) >= p.sampleRate*bytesPerSample { - if timer != nil { - timer.Stop() - } - timer = time.NewTimer(100 * time.Millisecond) - timerCh = timer.C - p.m.Unlock() + go func() { + for buf := range ch { + if _, err := w.Write(buf); err != nil { + p.context.setError(err) break } - - // Try to read the buffer for 1/60[s]. - s := 60 - if web.IsAndroidChrome() { - s = 10 - } else if web.IsBrowser() { - s = 20 - } - l := p.sampleRate * bytesPerSample / s - l &= mask - buf := make([]byte, l) - n, err := p.src.Read(buf) - - p.buf = append(p.buf, buf[:n]...) - if err == io.EOF { - p.srcEOF = true - } - if p.srcEOF && len(p.buf) == 0 { - if timer != nil { - timer.Stop() - } - timer = nil - timerCh = nil - p.m.Unlock() - break - } - p.m.Unlock() - - if err != nil && err != io.EOF { - readErr = err - if timer != nil { - timer.Stop() - } - timer = nil - timerCh = nil - break - } - if timer != nil { - timer.Stop() - } - if web.IsBrowser() { - timer = time.NewTimer(10 * time.Millisecond) - } else { - timer = time.NewTimer(time.Millisecond) - } - timerCh = timer.C - - case buf := <-p.proceedCh: - if readErr != nil { - p.proceededCh <- proceededValues{buf, readErr} - return - } - - p.m.Lock() - if p.shouldSkipImpl() { - // Return zero values. - p.proceededCh <- proceededValues{buf, nil} - p.m.Unlock() - break - } - - lengthInBytes := len(buf) * 2 - l := lengthInBytes - - if l > len(p.buf) { - l = len(p.buf) - } - for i := 0; i < l/2; i++ { - buf[i] = int16(p.buf[2*i]) | (int16(p.buf[2*i+1]) << 8) - buf[i] = int16(float64(buf[i]) * p.volume) - } - p.pos += int64(l) - p.buf = p.buf[l:] - p.m.Unlock() - - p.proceededCh <- proceededValues{buf[:l/2], nil} } + close(wclosed) + }() + + for { + buf, ok := p.read() + if !ok { + return + } + ch <- buf } } -func (p *playerImpl) shouldSkip() bool { +func (p *playerImpl) read() ([]byte, bool) { p.m.Lock() - r := p.shouldSkipImpl() - p.m.Unlock() - return r -} + defer p.m.Unlock() -func (p *playerImpl) shouldSkipImpl() bool { - // When p.buf is nil, the player just starts playing or seeking. - // Note that this is different from len(p.buf) == 0 && p.buf != nil. - if p.buf == nil { - return true + if p.context.hasError() { + return nil, false } - if p.eofImpl() { - return true + + if p.closedExplicitly { + return nil, false } - return false -} -func (p *playerImpl) bufferSizeInBytes() int { - p.m.Lock() - s := len(p.buf) - p.m.Unlock() - return s -} + const bufSize = 2048 -func (p *playerImpl) eof() bool { - p.m.Lock() - r := p.eofImpl() - p.m.Unlock() - return r -} + var buf []byte + var err error + var proceed int64 + if p.playing && p.context.playable() { + newBuf := make([]byte, bufSize-len(p.buf)) + n := 0 + n, err = p.src.Read(newBuf) + buf = append(p.buf, newBuf[:n]...) -func (p *playerImpl) eofImpl() bool { - return p.srcEOF && len(p.buf) == 0 + n2 := len(buf) - len(buf)%bytesPerSample + buf, p.buf = buf[:n2], buf[n2:] + + proceed = int64(len(buf)) + } else { + // Fill zero values, or the driver can block forever as trying to proceed. + buf = make([]byte, bufSize) + } + + if err == io.EOF && len(buf) == 0 { + return nil, false + } + + if err != nil && err != io.EOF { + p.context.setError(err) + return nil, false + } + + for i := 0; i < len(buf)/2; i++ { + v16 := int16(buf[2*i]) | (int16(buf[2*i+1]) << 8) + v16 = int16(float64(v16) * p.volume) + buf[2*i] = byte(v16) + buf[2*i+1] = byte(v16 >> 8) + } + p.pos += proceed + p.context.setReady() + + return buf, true } // IsPlaying returns boolean indicating whether the player is playing. @@ -614,13 +524,12 @@ func (p *playerImpl) Seek(offset time.Duration) error { if _, ok := p.src.(io.Seeker); !ok { panic("audio: player to be sought must be io.Seeker") } - if err := p.ensureReadLoop(); err != nil { - return err - } p.m.Lock() + defer p.m.Unlock() + o := int64(offset) * bytesPerSample * int64(p.sampleRate) / int64(time.Second) - o &= mask + o = o - (o % bytesPerSample) seeker, ok := p.src.(io.Seeker) if !ok { @@ -628,17 +537,11 @@ func (p *playerImpl) Seek(offset time.Duration) error { } pos, err := seeker.Seek(o, io.SeekStart) if err != nil { - p.m.Unlock() return err } p.buf = nil p.pos = pos - p.srcEOF = false - p.m.Unlock() - - p.seekCh <- struct{}{} - <-p.seekedCh return nil } @@ -653,7 +556,6 @@ func (p *Player) Pause() error { func (p *playerImpl) Pause() { p.m.Lock() p.playing = false - p.mux.removePlayer(p) p.m.Unlock() } diff --git a/audio/audio_test.go b/audio/audio_test.go index 63721b294..280760fb2 100644 --- a/audio/audio_test.go +++ b/audio/audio_test.go @@ -41,6 +41,9 @@ func TestGC(t *testing.T) { } p.Play() + // 200[ms] should be enough all the bytes are consumed. + // TODO: This is a darty hack. Would it be possible to use virtual time? + time.Sleep(200 * time.Millisecond) got = PlayersNumForTesting() if want := 1; got != want { t.Errorf("PlayersNum() after Play: got: %d, want: %d", got, want) diff --git a/audio/mux.go b/audio/mux.go deleted file mode 100644 index 69a783545..000000000 --- a/audio/mux.go +++ /dev/null @@ -1,145 +0,0 @@ -// Copyright 2019 The Ebiten Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package audio - -import ( - "errors" - "io" - "runtime" - "sync" -) - -type mux struct { - ps map[*playerImpl]struct{} - m sync.RWMutex -} - -const ( - channelNum = 2 - bytesPerSample = 2 * channelNum - - // TODO: This assumes that bytesPerSample is a power of 2. - mask = ^(bytesPerSample - 1) -) - -func newMux() *mux { - return &mux{ - ps: map[*playerImpl]struct{}{}, - } -} - -func (m *mux) Read(b []byte) (int, error) { - m.m.Lock() - defer m.m.Unlock() - - // Check the source duplication - srcs := map[io.ReadCloser]struct{}{} - for p := range m.ps { - if _, ok := srcs[p.src]; ok { - return 0, errors.New("audio: a same source is used by multiple Player") - } - srcs[p.src] = struct{}{} - } - - if len(m.ps) == 0 { - l := len(b) - l &= mask - copy(b, make([]byte, l)) - return l, nil - } - - l := len(b) - l &= mask - - allSkipped := true - - // TODO: Now a player is not locked. Should we lock it? - - for p := range m.ps { - if p.shouldSkip() { - continue - } - allSkipped = false - s := p.bufferSizeInBytes() - if l > s { - l = s - l &= mask - } - } - - if allSkipped { - l = 0 - } - - if l == 0 { - // If l is 0, all the ps might reach EOF at the next update. - // However, this Read might block forever and never causes context switch - // on single-thread environment (e.g. browser). - // Call Gosched to cause context switch on purpose. - runtime.Gosched() - } - - b16s := [][]int16{} - for p := range m.ps { - buf, err := p.bufferToInt16(l) - if err != nil { - return 0, err - } - if l > len(buf)*2 { - l = len(buf) * 2 - } - b16s = append(b16s, buf) - } - - for i := 0; i < l/2; i++ { - x := 0 - for _, b16 := range b16s { - x += int(b16[i]) - } - if x > (1<<15)-1 { - x = (1 << 15) - 1 - } - if x < -(1 << 15) { - x = -(1 << 15) - } - b[2*i] = byte(x) - b[2*i+1] = byte(x >> 8) - } - - closed := []*playerImpl{} - for p := range m.ps { - if p.eof() { - closed = append(closed, p) - } - } - for _, p := range closed { - p.closeImpl() - delete(m.ps, p) - } - - return l, nil -} - -func (m *mux) addPlayer(player *playerImpl) { - m.m.Lock() - m.ps[player] = struct{}{} - m.m.Unlock() -} - -func (m *mux) removePlayer(player *playerImpl) { - m.m.Lock() - delete(m.ps, player) - m.m.Unlock() -} diff --git a/audio/testinternal_test.go b/audio/testinternal_test.go index 6d3b8b4a6..f8f3314da 100644 --- a/audio/testinternal_test.go +++ b/audio/testinternal_test.go @@ -16,8 +16,8 @@ package audio func PlayersNumForTesting() int { c := CurrentContext() - c.mux.m.Lock() - n := len(c.mux.ps) - c.mux.m.Unlock() + c.m.Lock() + n := len(c.players) + c.m.Unlock() return n } diff --git a/go.mod b/go.mod index 75fc86ccc..3093aa0b1 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/gopherjs/gopherwasm v1.1.0 github.com/hajimehoshi/bitmapfont v1.1.1 github.com/hajimehoshi/go-mp3 v0.2.0 - github.com/hajimehoshi/oto v0.3.3 + github.com/hajimehoshi/oto v0.3.4-0.20190429120749-cf91156c8120 github.com/jakecoffman/cp v0.1.0 github.com/jfreymuth/oggvorbis v1.0.0 github.com/jfreymuth/vorbis v1.0.0 // indirect diff --git a/go.sum b/go.sum index 8d6485241..dfae46e33 100644 --- a/go.sum +++ b/go.sum @@ -18,8 +18,8 @@ github.com/hajimehoshi/go-mp3 v0.1.2/go.mod h1:4i+c5pDNKDrxl1iu9iG90/+fhP37lio6g github.com/hajimehoshi/go-mp3 v0.2.0 h1:isy34iDg+96PsNuFbTdRRXzKr6a1gc2nhsPuFSfXacY= github.com/hajimehoshi/go-mp3 v0.2.0/go.mod h1:4i+c5pDNKDrxl1iu9iG90/+fhP37lio6gNhjCx9WBJw= github.com/hajimehoshi/oto v0.1.1/go.mod h1:hUiLWeBQnbDu4pZsAhOnGqMI1ZGibS6e2qhQdfpwz04= -github.com/hajimehoshi/oto v0.3.3 h1:Wi7VVtxe9sF2rbDBIJtVXnpFWhRfK57hw0JY7tR2qXM= -github.com/hajimehoshi/oto v0.3.3/go.mod h1:e9eTLBB9iZto045HLbzfHJIc+jP3xaKrjZTghvb6fdM= +github.com/hajimehoshi/oto v0.3.4-0.20190429120749-cf91156c8120 h1:A2KJGAMyVidi7XxtrgfjFwq6KS2U3hYKdf+46tIycwM= +github.com/hajimehoshi/oto v0.3.4-0.20190429120749-cf91156c8120/go.mod h1:e9eTLBB9iZto045HLbzfHJIc+jP3xaKrjZTghvb6fdM= github.com/jakecoffman/cp v0.1.0 h1:sgSYEGUgfwiT447fRjloa2c5b6UyYP+7muR3gQK+Ep0= github.com/jakecoffman/cp v0.1.0/go.mod h1:a3xPx9N8RyFAACD644t2dj/nK4SuLg1v+jL61m2yVo4= github.com/jfreymuth/oggvorbis v1.0.0 h1:aOpiihGrFLXpsh2osOlEvTcg5/aluzGQeC7m3uYWOZ0=