FS2 java.nio.file.WatchService integration, file system changes as a stream
This post shows how to use FS2 version 0.10.0-M7 functional streams in Scala with Java NIO WatchService to detect file system changes.
This example is based on the Java listening for directory changes example at https://docs.oracle.com/javase/tutorial/essential/io/notification.html.
Imports
import java.io.File
import java.nio.file.{FileSystems, Path, WatchKey}
import java.nio.file.StandardWatchEventKinds._
import scala.collection.JavaConverters._
import cats.effect.IO
import fs2.{ Sink, Stream }ADT
We define an ADT for our events.
sealed abstract class FileEvent(val file: File) {
def fold[B](f1: Created => B, f2: Deleted => B, f3: Modified => B) =
this match {
case e @ Created(_) => f1(e)
case e @ Deleted(_) => f2(e)
case e @ Modified(_) => f3(e)
}
}
case class Created(override val file: File) extends FileEvent(file)
case class Deleted(override val file: File) extends FileEvent(file)
case class Modified(override val file: File) extends FileEvent(file)Steam[IO, FileEvent]
We define a stream to provide FileEvent’s based on a filesystem directory path. This follows the steps from https://docs.oracle.com/javase/tutorial/essential/io/notification.html.
val fileListenerStream: String => Stream[IO, FileEvent] = pathString =>
Stream.bracket(IO(FileSystems.getDefault.newWatchService))(Stream(_), w => IO(w.close())) // (1)
.flatMap { watcher =>
Stream.eval {
IO(FileSystems.getDefault.getPath(pathString).register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY)) // (2)
.map(_ => watcher)
}
}
.flatMap { watcher =>
def loop: Stream[IO, Seq[FileEvent]] = // (3)
Stream.eval[IO, Either[Throwable, Option[WatchKey]]](IO(Option(watcher.take)).attempt)
.flatMap { // (4)
case Right(Some(key)) =>
val items: Seq[FileEvent] =
key.pollEvents().asScala
.flatMap { event =>
if (event.kind == ENTRY_CREATE) {
Seq(Created(event.context.asInstanceOf[Path].toFile()))
} else if (event.kind == ENTRY_DELETE) {
Seq(Deleted(event.context.asInstanceOf[Path].toFile()))
} else if (event.kind == ENTRY_MODIFY) {
Seq(Modified(event.context.asInstanceOf[Path].toFile()))
} else {
// Drop OVERFLOW events, should handle these in a real scenario. Note we get these without subscribing
// https://docs.oracle.com/javase/7/docs/api/java/nio/file/StandardWatchEventKinds.html#OVERFLOW
Seq.empty
}
}
if (key.reset) Stream(items) ++ loop else Stream(items)
case Right(None) =>
loop
case Left(e:InterruptedException) =>
loop
case Left(e) =>
Stream.fail(e)
}
loop.flatMap { Stream.emits(_) } // (5)
}Stream.bracketis used to lift ajava.nio.file.WatchServicein to aStream[IO, WatchService]. Thebracketfunction will callWatchService.closeon completion.- We
flatMapon theStream[IO, WatchService]to register the directory to listen to events on. We then return the original WatchService. We useflatMapandStream.evalwith theIOtype as this step may raise an exception which will be handled by the underlyingIO. - We define a recursive function
loopwhich polls the WatchService for new events usingwatcher.take.watcher.takewill returnnullif no new events are available, it may throw anInterruptedExceptionor on success return aWatchKeyto access the underlying events. - We pattern match on the
watcher.takeresult lifting the events in to aStream[IO, Seq[FileEvent]] - We
flatMapover thelooprecursive function to flatten Stream[IO, Seq[FileEvent]]in toStream[IO, FileEvent]`
Example Usage
The below example will print any create, delete or modify event in /var/logs to stdout.
val debugSink = Sink[IO, FileEvent] {
case Created(file) => IO(println(s"Created(${file.getAbsolutePath})"))
case Deleted(file) => IO(println(s"Deleted(${file.getAbsolutePath})"))
case Modified(file) => IO(println(s"Modified(${file.getAbsolutePath})"))
}
fileListenerStream("/var/log")
.through(debugSink)
.run
.unsafeRunSync()