序
本文次要钻研一下 dapr 的 fswatcher
fswatcher
dapr/pkg/fswatcher/fswatcher.go
import (
"context"
"strings"
"time"
"github.com/fsnotify/fsnotify"
"github.com/pkg/errors"
)
func Watch(ctx context.Context, dir string, eventCh chan<- struct{}) error {watcher, err := fsnotify.NewWatcher()
if err != nil {return errors.Wrap(err, "failed to create watcher")
}
defer watcher.Close()
if err := watcher.Add(dir); err != nil {return errors.Wrap(err, "watcher error")
}
LOOP:
for {
select {
// watch for events
case event := <-watcher.Events:
if event.Op == fsnotify.Create || event.Op == fsnotify.Write {if strings.Contains(event.Name, dir) {
// give time for other updates to occur
time.Sleep(time.Second * 1)
eventCh <- struct{}{}
}
}
case <-watcher.Errors:
break LOOP
case <-ctx.Done():
break LOOP
}
}
return nil
}
Watch 办法应用 fsnotify 的 watcher 来监听文件,之后通过 for 循环进行 select,如果监听到 fsnotify.Create 或者 fsnotify.Write 的时候判断 event.Name 是否蕴含 dir,如果蕴含则 sleep 一秒而后告诉 eventCh
Add
github.com/fsnotify/fsnotify@v1.4.9/kqueue.go
// Add starts watching the named file or directory (non-recursively).
func (w *Watcher) Add(name string) error {w.mu.Lock()
w.externalWatches[name] = true
w.mu.Unlock()
_, err := w.addWatch(name, noteAllEvents)
return err
}
Add 办法设置 externalWatches[name] 为 true,而后执行 addWatch(name, noteAllEvents)
addWatch
github.com/fsnotify/fsnotify@v1.4.9/kqueue.go
// addWatch adds name to the watched file set.
// The flags are interpreted as described in kevent(2).
// Returns the real path to the file which was added, if any, which may be different from the one passed in the case of symlinks.
func (w *Watcher) addWatch(name string, flags uint32) (string, error) {
var isDir bool
// Make ./name and name equivalent
name = filepath.Clean(name)
w.mu.Lock()
if w.isClosed {w.mu.Unlock()
return "", errors.New("kevent instance already closed")
}
watchfd, alreadyWatching := w.watches[name]
// We already have a watch, but we can still override flags.
if alreadyWatching {isDir = w.paths[watchfd].isDir
}
w.mu.Unlock()
if !alreadyWatching {fi, err := os.Lstat(name)
if err != nil {return "", err}
// Don't watch sockets.
if fi.Mode()&os.ModeSocket == os.ModeSocket {return "", nil}
// Don't watch named pipes.
if fi.Mode()&os.ModeNamedPipe == os.ModeNamedPipe {return "", nil}
// Follow Symlinks
// Unfortunately, Linux can add bogus symlinks to watch list without
// issue, and Windows can't do symlinks period (AFAIK). To maintain
// consistency, we will act like everything is fine. There will simply
// be no file events for broken symlinks.
// Hence the returns of nil on errors.
if fi.Mode()&os.ModeSymlink == os.ModeSymlink {name, err = filepath.EvalSymlinks(name)
if err != nil {return "", nil}
w.mu.Lock()
_, alreadyWatching = w.watches[name]
w.mu.Unlock()
if alreadyWatching {return name, nil}
fi, err = os.Lstat(name)
if err != nil {return "", nil}
}
watchfd, err = unix.Open(name, openMode, 0700)
if watchfd == -1 {return "", err}
isDir = fi.IsDir()}
const registerAdd = unix.EV_ADD | unix.EV_CLEAR | unix.EV_ENABLE
if err := register(w.kq, []int{watchfd}, registerAdd, flags); err != nil {unix.Close(watchfd)
return "", err
}
if !alreadyWatching {w.mu.Lock()
w.watches[name] = watchfd
w.paths[watchfd] = pathInfo{name: name, isDir: isDir}
w.mu.Unlock()}
if isDir {
// Watch the directory if it has not been watched before,
// or if it was watched before, but perhaps only a NOTE_DELETE (watchDirectoryFiles)
w.mu.Lock()
watchDir := (flags&unix.NOTE_WRITE) == unix.NOTE_WRITE &&
(!alreadyWatching || (w.dirFlags[name]&unix.NOTE_WRITE) != unix.NOTE_WRITE)
// Store flags so this watch can be updated later
w.dirFlags[name] = flags
w.mu.Unlock()
if watchDir {if err := w.watchDirectoryFiles(name); err != nil {return "", err}
}
}
return name, nil
}
addWatch 办法针对尚未 watch 的执行 os.Lstat(name) 及 unix.Open(name, openMode, 0700);之后注册 registerAdd;另外针对 isDir 的状况执行 watchDirectoryFiles
watchDirectoryFiles
github.com/fsnotify/fsnotify@v1.4.9/kqueue.go
// watchDirectoryFiles to mimic inotify when adding a watch on a directory
func (w *Watcher) watchDirectoryFiles(dirPath string) error {
// Get all files
files, err := ioutil.ReadDir(dirPath)
if err != nil {return err}
for _, fileInfo := range files {filePath := filepath.Join(dirPath, fileInfo.Name())
filePath, err = w.internalWatch(filePath, fileInfo)
if err != nil {return err}
w.mu.Lock()
w.fileExists[filePath] = true
w.mu.Unlock()}
return nil
}
watchDirectoryFiles 遍历 files,挨个执行 internalWatch
internalWatch
github.com/fsnotify/fsnotify@v1.4.9/kqueue.go
func (w *Watcher) internalWatch(name string, fileInfo os.FileInfo) (string, error) {if fileInfo.IsDir() {
// mimic Linux providing delete events for subdirectories
// but preserve the flags used if currently watching subdirectory
w.mu.Lock()
flags := w.dirFlags[name]
w.mu.Unlock()
flags |= unix.NOTE_DELETE | unix.NOTE_RENAME
return w.addWatch(name, flags)
}
// watch file to mimic Linux inotify
return w.addWatch(name, noteAllEvents)
}
internalWatch 针对 dir 设置的 flag 为 NOTE_DELETE、NOTE_RENAME
小结
dapr 的 fswatcher 应用 fsnotify 的 watcher 来监听文件,之后通过 for 循环进行 select,如果监听到 fsnotify.Create 或者 fsnotify.Write 的时候判断 event.Name 是否蕴含 dir,如果蕴含则 sleep 一秒而后告诉 eventCh。
doc
- dapr