diff --git a/audio/audio.go b/audio/audio.go index cdd694332..84fa64466 100644 --- a/audio/audio.go +++ b/audio/audio.go @@ -39,8 +39,7 @@ import ( ) type players struct { - players map[*Player]struct{} - seekings map[*Player]struct{} + players map[*Player]struct{} sync.RWMutex } @@ -65,9 +64,6 @@ func (p *players) Read(b []uint8) (int, error) { players := []*Player{} for player := range p.players { - if _, ok := p.seekings[player]; ok { - continue - } players = append(players, player) } if len(players) == 0 { @@ -79,12 +75,13 @@ func (p *players) Read(b []uint8) (int, error) { closed := []*Player{} l := len(b) for _, player := range players { - if err := player.readToBuffer(l); err == io.EOF { + n, err := player.readToBuffer(l) + if err == io.EOF { closed = append(closed, player) } else if err != nil { return 0, err } - l = min(player.bufferLength(), l) + l = min(n, l) } l &= mask b16s := [][]int16{} @@ -126,18 +123,6 @@ func (p *players) removePlayer(player *Player) { p.Unlock() } -func (p *players) addSeeking(player *Player) { - p.Lock() - p.seekings[player] = struct{}{} - p.Unlock() -} - -func (p *players) removeSeeking(player *Player) { - p.Lock() - delete(p.seekings, player) - p.Unlock() -} - func (p *players) hasPlayer(player *Player) bool { p.RLock() _, ok := p.players[player] @@ -213,8 +198,7 @@ func NewContext(sampleRate int) (*Context, error) { } theContext = c c.players = &players{ - players: map[*Player]struct{}{}, - seekings: map[*Player]struct{}{}, + players: map[*Player]struct{}{}, } return c, nil @@ -306,6 +290,7 @@ type Player struct { players *players src ReadSeekCloser sampleRate int + readingCh chan error buf []uint8 pos int64 @@ -380,21 +365,41 @@ func (p *Player) Close() error { return err } -func (p *Player) readToBuffer(length int) error { - bb := make([]uint8, length) - p.srcM.Lock() - n, err := p.src.Read(bb) - p.srcM.Unlock() - if 0 < n { - p.m.Lock() - p.buf = append(p.buf, bb[:n]...) - p.m.Unlock() +func (p *Player) readToBuffer(length int) (int, error) { + if p.readingCh == nil { + p.readingCh = make(chan error) + go func() { + defer close(p.readingCh) + bb := make([]uint8, length) + p.srcM.Lock() + n, err := p.src.Read(bb) + p.srcM.Unlock() + if err != nil { + p.readingCh <- err + return + } + if 0 < n { + p.m.Lock() + p.buf = append(p.buf, bb[:n]...) + p.m.Unlock() + } + }() + } + select { + case err := <-p.readingCh: + p.readingCh = nil + return len(p.buf), err + case <-time.After(15 * time.Millisecond): + return length, nil } - return err } func (p *Player) bufferToInt16(lengthInBytes int) []int16 { r := make([]int16, lengthInBytes/2) + // This function must be called on the same goruotine of readToBuffer. + if p.readingCh != nil { + return r + } p.m.RLock() for i := 0; i < lengthInBytes/2; i++ { r[i] = int16(p.buf[2*i]) | (int16(p.buf[2*i+1]) << 8) @@ -405,19 +410,16 @@ func (p *Player) bufferToInt16(lengthInBytes int) []int16 { } func (p *Player) proceed(length int) { + // This function must be called on the same goruotine of readToBuffer. + if p.readingCh != nil { + return + } p.m.Lock() p.buf = p.buf[length:] p.pos += int64(length) p.m.Unlock() } -func (p *Player) bufferLength() int { - p.m.RLock() - l := len(p.buf) - p.m.RUnlock() - return l -} - // Play plays the stream. // // Play always returns nil. @@ -450,12 +452,9 @@ func (p *Player) Rewind() error { // // Seek returns error when seeking the source returns error. func (p *Player) Seek(offset time.Duration) error { - p.players.addSeeking(p) - defer p.players.removeSeeking(p) o := int64(offset) * bytesPerSample * channelNum * int64(p.sampleRate) / int64(time.Second) o &= mask p.srcM.Lock() - // TODO: Seek finishes soon but after that, Read takes long time and players.Read can block. pos, err := p.src.Seek(o, io.SeekStart) p.srcM.Unlock() if err != nil {