From a5f8c1b1f8e8adeb0388d56dac9cd8b5f3e40ec0 Mon Sep 17 00:00:00 2001 From: Hajime Hoshi Date: Sat, 23 Dec 2017 02:05:49 +0900 Subject: [PATCH] audio: Async reading sources (#443) --- audio/audio.go | 184 +++++++++++++++++++++++++++++-------------------- 1 file changed, 111 insertions(+), 73 deletions(-) diff --git a/audio/audio.go b/audio/audio.go index 386194a35..f1d01f2cb 100644 --- a/audio/audio.go +++ b/audio/audio.go @@ -61,7 +61,7 @@ const ( mask = ^(channelNum*bytesPerSample - 1) ) -func (p *players) Read(b []uint8) (int, error) { +func (p *players) Read(b []byte) (int, error) { p.Lock() defer p.Unlock() @@ -72,25 +72,18 @@ func (p *players) Read(b []uint8) (int, error) { if len(players) == 0 { l := len(b) l &= mask - copy(b, make([]uint8, l)) + copy(b, make([]byte, l)) return l, nil } - closed := []*Player{} - - for _, player := range players { - player.resetBufferIfSeeking() - } l := len(b) for _, player := range players { - n, err := player.readToBuffer(l) - if err == io.EOF { - closed = append(closed, player) - } else if err != nil { - return 0, err - } - if n < l { - l = n + select { + case err := <-player.readCh: + if err != nil { + return 0, err + } + default: } } l &= mask @@ -115,12 +108,11 @@ func (p *players) Read(b []uint8) (int, error) { } for _, player := range players { - player.proceed(l) + if player.eof() { + delete(p.players, player) + } } - for _, pl := range closed { - delete(p.players, pl) - } return l, nil } @@ -278,7 +270,7 @@ func (c *Context) loop() { l := (c.frames * int64(bytesPerFrame)) - c.writtenBytes l &= mask c.writtenBytes += l - buf := make([]uint8, l) + buf := make([]byte, l) if _, err := io.ReadFull(c.players, buf); err != nil { audiobinding.SetError(err) return @@ -314,7 +306,7 @@ type bytesReadSeekCloser struct { reader *bytes.Reader } -func (b *bytesReadSeekCloser) Read(buf []uint8) (int, error) { +func (b *bytesReadSeekCloser) Read(buf []byte) (int, error) { return b.reader.Read(buf) } @@ -328,12 +320,12 @@ func (b *bytesReadSeekCloser) Close() error { } // BytesReadSeekCloser creates ReadSeekCloser from bytes. -func BytesReadSeekCloser(b []uint8) ReadSeekCloser { +func BytesReadSeekCloser(b []byte) ReadSeekCloser { return &bytesReadSeekCloser{reader: bytes.NewReader(b)} } type readingResult struct { - data []uint8 + data []byte err error } @@ -341,17 +333,18 @@ type readingResult struct { type Player struct { players *players src ReadSeekCloser + srcEOF bool sampleRate int - buf []uint8 + buf []byte pos int64 volume float64 - seeking bool - nextPos int64 + readCh chan error + closeCh chan struct{} + closedCh chan struct{} - srcM sync.Mutex - m sync.RWMutex + m sync.RWMutex } // NewPlayer creates a new player with the given stream. @@ -372,7 +365,7 @@ func NewPlayer(context *Context, src ReadSeekCloser) (*Player, error) { players: context.players, src: src, sampleRate: context.sampleRate, - buf: []uint8{}, + buf: []byte{}, volume: 1, } // Get the current position of the source. @@ -393,7 +386,7 @@ func NewPlayer(context *Context, src ReadSeekCloser) (*Player, error) { // The format of src should be same as noted at NewPlayer. // // NewPlayerFromBytes's error is always nil as of 1.5.0-alpha. -func NewPlayerFromBytes(context *Context, src []uint8) (*Player, error) { +func NewPlayerFromBytes(context *Context, src []byte) (*Player, error) { b := BytesReadSeekCloser(src) p, err := NewPlayer(context, b) if err != nil { @@ -410,44 +403,40 @@ func NewPlayerFromBytes(context *Context, src []uint8) (*Player, error) { // // Close returns error when closing the source returns error. func (p *Player) Close() error { - p.players.removePlayer(p) runtime.SetFinalizer(p, nil) - p.srcM.Lock() - err := p.src.Close() - p.srcM.Unlock() - return err -} + p.players.removePlayer(p) -func (p *Player) readToBuffer(length int) (int, error) { - b := make([]uint8, length) - p.srcM.Lock() - n, err := p.src.Read(b) - p.srcM.Unlock() - if err != nil { - return 0, err - } - p.buf = append(p.buf, b[:n]...) - return len(p.buf), nil + p.m.Lock() + err := p.src.Close() + p.m.Unlock() + + close(p.closeCh) + <-p.closedCh + + return err } func (p *Player) bufferToInt16(lengthInBytes int) []int16 { r := make([]int16, lengthInBytes/2) - // This function must be called on the same goruotine of readToBuffer. - p.m.RLock() - for i := 0; i < lengthInBytes/2; i++ { + + p.m.Lock() + l := lengthInBytes + if len(p.buf) < lengthInBytes { + if !p.srcEOF { + p.m.Unlock() + return r + } + l = len(p.buf) + } + for i := 0; i < l/2; i++ { r[i] = int16(p.buf[2*i]) | (int16(p.buf[2*i+1]) << 8) r[i] = int16(float64(r[i]) * p.volume) } - p.m.RUnlock() - return r -} -func (p *Player) proceed(length int) { - // This function must be called on the same goruotine of readToBuffer. - p.m.Lock() - p.buf = p.buf[length:] - p.pos += int64(length) + p.pos += int64(l) + p.buf = p.buf[l:] p.m.Unlock() + return r } // Play plays the stream. @@ -455,9 +444,67 @@ func (p *Player) proceed(length int) { // Play always returns nil. func (p *Player) Play() error { p.players.addPlayer(p) + p.startRead() return nil } +func (p *Player) startRead() { + p.m.Lock() + if p.readCh == nil { + p.readCh = make(chan error) + p.closeCh = make(chan struct{}) + p.closedCh = make(chan struct{}) + p.srcEOF = false + go func() { + if err := p.readLoop(); err != nil { + p.readCh <- err + } + p.m.Lock() + p.readCh = nil + p.m.Unlock() + close(p.closedCh) + }() + } + p.m.Unlock() +} + +func (p *Player) readLoop() error { + t := time.Tick(1 * time.Millisecond) + for { + select { + case <-p.closeCh: + p.closeCh = nil + return nil + case <-t: + p.m.Lock() + if len(p.buf) < 4096*16 && !p.srcEOF { + buf := make([]byte, 4096) + n, err := p.src.Read(buf) + p.buf = append(p.buf, buf[:n]...) + if err == io.EOF { + p.srcEOF = true + } + if p.srcEOF && len(p.buf) == 0 { + p.m.Unlock() + return nil + } + if err != nil && err != io.EOF { + p.m.Unlock() + return err + } + } + p.m.Unlock() + } + } +} + +func (p *Player) eof() bool { + p.m.Lock() + r := p.srcEOF && len(p.buf) == 0 + p.m.Unlock() + return r +} + // IsPlaying returns boolean indicating whether the player is playing. func (p *Player) IsPlaying() bool { return p.players.hasPlayer(p) @@ -476,28 +523,19 @@ func (p *Player) Rewind() error { func (p *Player) Seek(offset time.Duration) error { o := int64(offset) * bytesPerSample * channelNum * int64(p.sampleRate) / int64(time.Second) o &= mask - p.srcM.Lock() + + p.m.Lock() pos, err := p.src.Seek(o, io.SeekStart) - p.srcM.Unlock() if err != nil { + p.m.Unlock() return err } - p.m.Lock() - p.seeking = true - p.nextPos = pos + p.buf = nil + p.pos = pos + p.srcEOF = false p.m.Unlock() - return nil -} -func (p *Player) resetBufferIfSeeking() { - // This function must be called on the same goruotine of readToBuffer. - p.m.Lock() - if p.seeking { - p.buf = []uint8{} - p.pos = p.nextPos - p.seeking = false - } - p.m.Unlock() + return nil } // Pause pauses the playing.