From 8d6f2e3fe8851b581309da25fc4c32f8be675932 Mon Sep 17 00:00:00 2001 From: Brian Goff Date: Mon, 11 Jul 2016 16:31:42 -0400 Subject: [PATCH] Fix issues with tailing rotated jsonlog file Fixes a race where the log reader would get events for both an actual rotation as we from fsnotify (`fsnotify.Rename`). This issue becomes extremely apparent when rotations are fast, for example: ``` $ docker run -d --name test --log-opt max-size=1 --log-opt max-file=2 busybox sh -c 'while true; do echo hello; usleep 100000; done' ``` With this change the log reader for jsonlogs can handle rotations that happen as above. Instead of listening for both fs events AND rotation events simultaneously, potentially meaning we see 2 rotations for only a single rotation due to channel buffering, only listen for fs events (like `Rename`) and then wait to be notified about rotation by the logger. This makes sure that we don't see 2 rotations for 1, and that we don't start trying to read until the logger is actually ready for us to. Signed-off-by: Brian Goff This commit is pending upstream commit fixing broken log tailing. The original commit can be found in the PR here: - https://github.com/docker/docker/pull/24514 Signed-off-by: Christian Stewart --- daemon/logger/jsonfilelog/read.go | 180 +++++++++++++++++++++++++------------- 1 file changed, 119 insertions(+), 61 deletions(-) diff --git a/daemon/logger/jsonfilelog/read.go b/daemon/logger/jsonfilelog/read.go index bea83dd..0cb44af 100644 --- a/daemon/logger/jsonfilelog/read.go +++ b/daemon/logger/jsonfilelog/read.go @@ -3,11 +3,14 @@ package jsonfilelog import ( "bytes" "encoding/json" + "errors" "fmt" "io" "os" "time" + "gopkg.in/fsnotify.v1" + "github.com/Sirupsen/logrus" "github.com/docker/docker/daemon/logger" "github.com/docker/docker/pkg/filenotify" @@ -44,6 +47,10 @@ func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher { func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) { defer close(logWatcher.Msg) + // lock so the read stream doesn't get corrupted do to rotations or other log data written while we read + // This will block writes!!! + l.mu.Lock() + pth := l.writer.LogPath() var files []io.ReadSeeker for i := l.writer.MaxFiles(); i > 1; i-- { @@ -61,6 +68,7 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R latestFile, err := os.Open(pth) if err != nil { logWatcher.Err <- err + l.mu.Unlock() return } @@ -80,6 +88,7 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R if err := latestFile.Close(); err != nil { logrus.Errorf("Error closing file: %v", err) } + l.mu.Unlock() return } @@ -87,7 +96,6 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R latestFile.Seek(0, os.SEEK_END) } - l.mu.Lock() l.readers[logWatcher] = struct{}{} l.mu.Unlock() @@ -128,92 +136,142 @@ func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since ti } } +func watchFile(name string) (filenotify.FileWatcher, error) { + fileWatcher, err := filenotify.New() + if err != nil { + return nil, err + } + + if err := fileWatcher.Add(name); err != nil { + logrus.WithField("logger", "json-file").Warnf("falling back to file poller due to error: %v", err) + fileWatcher.Close() + fileWatcher = filenotify.NewPollingWatcher() + + if err := fileWatcher.Add(name); err != nil { + fileWatcher.Close() + logrus.Debugf("error watching log file for modifications: %v", err) + return nil, err + } + } + return fileWatcher, nil +} + func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, since time.Time) { dec := json.NewDecoder(f) l := &jsonlog.JSONLog{} - fileWatcher, err := filenotify.New() + name := f.Name() + fileWatcher, err := watchFile(name) if err != nil { logWatcher.Err <- err + return } defer func() { f.Close() fileWatcher.Close() }() - name := f.Name() - if err := fileWatcher.Add(name); err != nil { - logrus.WithField("logger", "json-file").Warnf("falling back to file poller due to error: %v", err) - fileWatcher.Close() - fileWatcher = filenotify.NewPollingWatcher() + var retries int + handleRotate := func() error { + f.Close() + fileWatcher.Remove(name) + // retry when the file doesn't exist + for retries := 0; retries <= 5; retries++ { + f, err = os.Open(name) + if err == nil || !os.IsNotExist(err) { + break + } + } + if err != nil { + return err + } if err := fileWatcher.Add(name); err != nil { - logrus.Debugf("error watching log file for modifications: %v", err) - logWatcher.Err <- err - return + return err } + dec = json.NewDecoder(f) + return nil } - var retries int - for { - msg, err := decodeLogLine(dec, l) - if err != nil { - if err != io.EOF { - // try again because this shouldn't happen - if _, ok := err.(*json.SyntaxError); ok && retries <= maxJSONDecodeRetry { - dec = json.NewDecoder(f) - retries++ - continue + errRetry := errors.New("retry") + errDone := errors.New("done") + waitRead := func() error { + select { + case e := <-fileWatcher.Events(): + switch e.Op { + case fsnotify.Write: + dec = json.NewDecoder(f) + return nil + case fsnotify.Rename, fsnotify.Remove: + <-notifyRotate + if err := handleRotate(); err != nil { + return err } - - // io.ErrUnexpectedEOF is returned from json.Decoder when there is - // remaining data in the parser's buffer while an io.EOF occurs. - // If the json logger writes a partial json log entry to the disk - // while at the same time the decoder tries to decode it, the race condition happens. - if err == io.ErrUnexpectedEOF && retries <= maxJSONDecodeRetry { - reader := io.MultiReader(dec.Buffered(), f) - dec = json.NewDecoder(reader) - retries++ - continue + return nil + } + return errRetry + case err := <-fileWatcher.Errors(): + logrus.Debug("logger got error watching file: %v", err) + // Something happened, let's try and stay alive and create a new watcher + if retries <= 5 { + fileWatcher, err = watchFile(name) + if err != nil { + return err } - - return + retries++ + return errRetry } + return err + case <-logWatcher.WatchClose(): + fileWatcher.Remove(name) + return errDone + } + } - select { - case <-fileWatcher.Events(): - dec = json.NewDecoder(f) - continue - case <-fileWatcher.Errors(): - logWatcher.Err <- err - return - case <-logWatcher.WatchClose(): - fileWatcher.Remove(name) - return - case <-notifyRotate: - f.Close() - fileWatcher.Remove(name) - - // retry when the file doesn't exist - for retries := 0; retries <= 5; retries++ { - f, err = os.Open(name) - if err == nil || !os.IsNotExist(err) { - break - } + handleDecodeErr := func(err error) error { + if err == io.EOF { + for err := waitRead(); err != nil; { + if err == errRetry { + // retry the waitRead + continue } + return err + } + return nil + } + // try again because this shouldn't happen + if _, ok := err.(*json.SyntaxError); ok && retries <= maxJSONDecodeRetry { + dec = json.NewDecoder(f) + retries++ + return nil + } + // io.ErrUnexpectedEOF is returned from json.Decoder when there is + // remaining data in the parser's buffer while an io.EOF occurs. + // If the json logger writes a partial json log entry to the disk + // while at the same time the decoder tries to decode it, the race condition happens. + if err == io.ErrUnexpectedEOF && retries <= maxJSONDecodeRetry { + reader := io.MultiReader(dec.Buffered(), f) + dec = json.NewDecoder(reader) + retries++ + return nil + } + return err + } - if err = fileWatcher.Add(name); err != nil { - logWatcher.Err <- err - return - } - if err != nil { - logWatcher.Err <- err + // main loop + for { + msg, err := decodeLogLine(dec, l) + if err != nil { + if err := handleDecodeErr(err); err != nil { + if err == errDone { return } - - dec = json.NewDecoder(f) - continue + // we got an unrecoverable error, so return + logWatcher.Err <- err + return } + // ready to try again + continue } retries = 0 // reset retries since we've succeeded -- 2.7.3