FS2 java.nio.file.WatchService integration, file system changes as a stream
This post shows how to use FS2 version 0.10.0-M7 functional streams in Scala with Java NIO WatchService to detect file system changes.
This example is based on the Java listening for directory changes example at https://docs.oracle.com/javase/tutorial/essential/io/notification.html.
Imports
import java.io.File
import java.nio.file.{FileSystems, Path, WatchKey}
import java.nio.file.StandardWatchEventKinds._
import scala.collection.JavaConverters._
import cats.effect.IO
import fs2.{ Sink, Stream }
ADT
We define an ADT for our events.
sealed abstract class FileEvent(val file: File) {
def fold[B](f1: Created => B, f2: Deleted => B, f3: Modified => B) =
this match {
case e @ Created(_) => f1(e)
case e @ Deleted(_) => f2(e)
case e @ Modified(_) => f3(e)
}
}
case class Created(override val file: File) extends FileEvent(file)
case class Deleted(override val file: File) extends FileEvent(file)
case class Modified(override val file: File) extends FileEvent(file)
Steam[IO, FileEvent]
We define a stream to provide FileEvent’s based on a filesystem directory path. This follows the steps from https://docs.oracle.com/javase/tutorial/essential/io/notification.html.
val fileListenerStream: String => Stream[IO, FileEvent] = pathString =>
Stream.bracket(IO(FileSystems.getDefault.newWatchService))(Stream(_), w => IO(w.close())) // (1)
.flatMap { watcher =>
Stream.eval {
IO(FileSystems.getDefault.getPath(pathString).register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY)) // (2)
.map(_ => watcher)
}
}
.flatMap { watcher =>
def loop: Stream[IO, Seq[FileEvent]] = // (3)
Stream.eval[IO, Either[Throwable, Option[WatchKey]]](IO(Option(watcher.take)).attempt)
.flatMap { // (4)
case Right(Some(key)) =>
val items: Seq[FileEvent] =
key.pollEvents().asScala
.flatMap { event =>
if (event.kind == ENTRY_CREATE) {
Seq(Created(event.context.asInstanceOf[Path].toFile()))
} else if (event.kind == ENTRY_DELETE) {
Seq(Deleted(event.context.asInstanceOf[Path].toFile()))
} else if (event.kind == ENTRY_MODIFY) {
Seq(Modified(event.context.asInstanceOf[Path].toFile()))
} else {
// Drop OVERFLOW events, should handle these in a real scenario. Note we get these without subscribing
// https://docs.oracle.com/javase/7/docs/api/java/nio/file/StandardWatchEventKinds.html#OVERFLOW
Seq.empty
}
}
if (key.reset) Stream(items) ++ loop else Stream(items)
case Right(None) =>
loop
case Left(e:InterruptedException) =>
loop
case Left(e) =>
Stream.fail(e)
}
loop.flatMap { Stream.emits(_) } // (5)
}
Stream.bracket
is used to lift ajava.nio.file.WatchService
in to aStream[IO, WatchService]
. Thebracket
function will callWatchService.close
on completion.- We
flatMap
on theStream[IO, WatchService]
to register the directory to listen to events on. We then return the original WatchService. We useflatMap
andStream.eval
with theIO
type as this step may raise an exception which will be handled by the underlyingIO
. - We define a recursive function
loop
which polls the WatchService for new events usingwatcher.take
.watcher.take
will returnnull
if no new events are available, it may throw anInterruptedException
or on success return aWatchKey
to access the underlying events. - We pattern match on the
watcher.take
result lifting the events in to aStream[IO, Seq[FileEvent]]
- We
flatMap
over theloop
recursive function to flatten Stream[IO, Seq[FileEvent]]in to
Stream[IO, FileEvent]`
Example Usage
The below example will print any create, delete or modify event in /var/logs
to stdout.
val debugSink = Sink[IO, FileEvent] {
case Created(file) => IO(println(s"Created(${file.getAbsolutePath})"))
case Deleted(file) => IO(println(s"Deleted(${file.getAbsolutePath})"))
case Modified(file) => IO(println(s"Modified(${file.getAbsolutePath})"))
}
fileListenerStream("/var/log")
.through(debugSink)
.run
.unsafeRunSync()