0001-Fix-issues-with-tailing-rotated-jsonlog-file.patch 8.31 KB
From 8d6f2e3fe8851b581309da25fc4c32f8be675932 Mon Sep 17 00:00:00 2001
From: Brian Goff <cpuguy83@gmail.com>
Date: Mon, 11 Jul 2016 16:31:42 -0400
Subject: [PATCH] Fix issues with tailing rotated jsonlog file

Fixes a race where the log reader would get events for both an actual
rotation as we from fsnotify (`fsnotify.Rename`).
This issue becomes extremely apparent when rotations are fast, for
example:

```
$ docker run -d --name test --log-opt max-size=1 --log-opt max-file=2
busybox sh -c 'while true; do echo hello; usleep 100000; done'
```

With this change the log reader for jsonlogs can handle rotations that
happen as above.

Instead of listening for both fs events AND rotation events
simultaneously, potentially meaning we see 2 rotations for only a single
rotation due to channel buffering, only listen for fs events (like
`Rename`) and then wait to be notified about rotation by the logger.
This makes sure that we don't see 2 rotations for 1, and that we don't
start trying to read until the logger is actually ready for us to.

Signed-off-by: Brian Goff <cpuguy83@gmail.com>

This commit is pending upstream commit fixing broken log tailing. The
original commit can be found in the PR here:

  - https://github.com/docker/docker/pull/24514

Signed-off-by: Christian Stewart <christian@paral.in>
---
 daemon/logger/jsonfilelog/read.go | 180 +++++++++++++++++++++++++-------------
 1 file changed, 119 insertions(+), 61 deletions(-)

diff --git a/daemon/logger/jsonfilelog/read.go b/daemon/logger/jsonfilelog/read.go
index bea83dd..0cb44af 100644
--- a/daemon/logger/jsonfilelog/read.go
+++ b/daemon/logger/jsonfilelog/read.go
@@ -3,11 +3,14 @@ package jsonfilelog
 import (
 	"bytes"
 	"encoding/json"
+	"errors"
 	"fmt"
 	"io"
 	"os"
 	"time"
 
+	"gopkg.in/fsnotify.v1"
+
 	"github.com/Sirupsen/logrus"
 	"github.com/docker/docker/daemon/logger"
 	"github.com/docker/docker/pkg/filenotify"
@@ -44,6 +47,10 @@ func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
 func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) {
 	defer close(logWatcher.Msg)
 
+	// lock so the read stream doesn't get corrupted do to rotations or other log data written while we read
+	// This will block writes!!!
+	l.mu.Lock()
+
 	pth := l.writer.LogPath()
 	var files []io.ReadSeeker
 	for i := l.writer.MaxFiles(); i > 1; i-- {
@@ -61,6 +68,7 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R
 	latestFile, err := os.Open(pth)
 	if err != nil {
 		logWatcher.Err <- err
+		l.mu.Unlock()
 		return
 	}
 
@@ -80,6 +88,7 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R
 		if err := latestFile.Close(); err != nil {
 			logrus.Errorf("Error closing file: %v", err)
 		}
+		l.mu.Unlock()
 		return
 	}
 
@@ -87,7 +96,6 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R
 		latestFile.Seek(0, os.SEEK_END)
 	}
 
-	l.mu.Lock()
 	l.readers[logWatcher] = struct{}{}
 	l.mu.Unlock()
 
@@ -128,92 +136,142 @@ func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since ti
 	}
 }
 
+func watchFile(name string) (filenotify.FileWatcher, error) {
+	fileWatcher, err := filenotify.New()
+	if err != nil {
+		return nil, err
+	}
+
+	if err := fileWatcher.Add(name); err != nil {
+		logrus.WithField("logger", "json-file").Warnf("falling back to file poller due to error: %v", err)
+		fileWatcher.Close()
+		fileWatcher = filenotify.NewPollingWatcher()
+
+		if err := fileWatcher.Add(name); err != nil {
+			fileWatcher.Close()
+			logrus.Debugf("error watching log file for modifications: %v", err)
+			return nil, err
+		}
+	}
+	return fileWatcher, nil
+}
+
 func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, since time.Time) {
 	dec := json.NewDecoder(f)
 	l := &jsonlog.JSONLog{}
 
-	fileWatcher, err := filenotify.New()
+	name := f.Name()
+	fileWatcher, err := watchFile(name)
 	if err != nil {
 		logWatcher.Err <- err
+		return
 	}
 	defer func() {
 		f.Close()
 		fileWatcher.Close()
 	}()
-	name := f.Name()
 
-	if err := fileWatcher.Add(name); err != nil {
-		logrus.WithField("logger", "json-file").Warnf("falling back to file poller due to error: %v", err)
-		fileWatcher.Close()
-		fileWatcher = filenotify.NewPollingWatcher()
+	var retries int
+	handleRotate := func() error {
+		f.Close()
+		fileWatcher.Remove(name)
 
+		// retry when the file doesn't exist
+		for retries := 0; retries <= 5; retries++ {
+			f, err = os.Open(name)
+			if err == nil || !os.IsNotExist(err) {
+				break
+			}
+		}
+		if err != nil {
+			return err
+		}
 		if err := fileWatcher.Add(name); err != nil {
-			logrus.Debugf("error watching log file for modifications: %v", err)
-			logWatcher.Err <- err
-			return
+			return err
 		}
+		dec = json.NewDecoder(f)
+		return nil
 	}
 
-	var retries int
-	for {
-		msg, err := decodeLogLine(dec, l)
-		if err != nil {
-			if err != io.EOF {
-				// try again because this shouldn't happen
-				if _, ok := err.(*json.SyntaxError); ok && retries <= maxJSONDecodeRetry {
-					dec = json.NewDecoder(f)
-					retries++
-					continue
+	errRetry := errors.New("retry")
+	errDone := errors.New("done")
+	waitRead := func() error {
+		select {
+		case e := <-fileWatcher.Events():
+			switch e.Op {
+			case fsnotify.Write:
+				dec = json.NewDecoder(f)
+				return nil
+			case fsnotify.Rename, fsnotify.Remove:
+				<-notifyRotate
+				if err := handleRotate(); err != nil {
+					return err
 				}
-
-				// io.ErrUnexpectedEOF is returned from json.Decoder when there is
-				// remaining data in the parser's buffer while an io.EOF occurs.
-				// If the json logger writes a partial json log entry to the disk
-				// while at the same time the decoder tries to decode it, the race condition happens.
-				if err == io.ErrUnexpectedEOF && retries <= maxJSONDecodeRetry {
-					reader := io.MultiReader(dec.Buffered(), f)
-					dec = json.NewDecoder(reader)
-					retries++
-					continue
+				return nil
+			}
+			return errRetry
+		case err := <-fileWatcher.Errors():
+			logrus.Debug("logger got error watching file: %v", err)
+			// Something happened, let's try and stay alive and create a new watcher
+			if retries <= 5 {
+				fileWatcher, err = watchFile(name)
+				if err != nil {
+					return err
 				}
-
-				return
+				retries++
+				return errRetry
 			}
+			return err
+		case <-logWatcher.WatchClose():
+			fileWatcher.Remove(name)
+			return errDone
+		}
+	}
 
-			select {
-			case <-fileWatcher.Events():
-				dec = json.NewDecoder(f)
-				continue
-			case <-fileWatcher.Errors():
-				logWatcher.Err <- err
-				return
-			case <-logWatcher.WatchClose():
-				fileWatcher.Remove(name)
-				return
-			case <-notifyRotate:
-				f.Close()
-				fileWatcher.Remove(name)
-
-				// retry when the file doesn't exist
-				for retries := 0; retries <= 5; retries++ {
-					f, err = os.Open(name)
-					if err == nil || !os.IsNotExist(err) {
-						break
-					}
+	handleDecodeErr := func(err error) error {
+		if err == io.EOF {
+			for err := waitRead(); err != nil; {
+				if err == errRetry {
+					// retry the waitRead
+					continue
 				}
+				return err
+			}
+			return nil
+		}
+		// try again because this shouldn't happen
+		if _, ok := err.(*json.SyntaxError); ok && retries <= maxJSONDecodeRetry {
+			dec = json.NewDecoder(f)
+			retries++
+			return nil
+		}
+		// io.ErrUnexpectedEOF is returned from json.Decoder when there is
+		// remaining data in the parser's buffer while an io.EOF occurs.
+		// If the json logger writes a partial json log entry to the disk
+		// while at the same time the decoder tries to decode it, the race condition happens.
+		if err == io.ErrUnexpectedEOF && retries <= maxJSONDecodeRetry {
+			reader := io.MultiReader(dec.Buffered(), f)
+			dec = json.NewDecoder(reader)
+			retries++
+			return nil
+		}
+		return err
+	}
 
-				if err = fileWatcher.Add(name); err != nil {
-					logWatcher.Err <- err
-					return
-				}
-				if err != nil {
-					logWatcher.Err <- err
+	// main loop
+	for {
+		msg, err := decodeLogLine(dec, l)
+		if err != nil {
+			if err := handleDecodeErr(err); err != nil {
+				if err == errDone {
 					return
 				}
-
-				dec = json.NewDecoder(f)
-				continue
+				// we got an unrecoverable error, so return
+				logWatcher.Err <- err
+				return
 			}
+			// ready to try again
+			continue
 		}
 
 		retries = 0 // reset retries since we've succeeded
-- 
2.7.3