FS2 java.nio.file.WatchService integration, file system changes as a stream

This post shows how to use FS2 version 0.10.0-M7 functional streams in Scala with Java NIO WatchService to detect file system changes.

This example is based on the Java listening for directory changes example at https://docs.oracle.com/javase/tutorial/essential/io/notification.html.

Imports

import java.io.File
import java.nio.file.{FileSystems, Path, WatchKey}
import java.nio.file.StandardWatchEventKinds._

import scala.collection.JavaConverters._

import cats.effect.IO
import fs2.{ Sink, Stream }

ADT

We define an ADT for our events.

sealed abstract class FileEvent(val file: File) {
  def fold[B](f1: Created => B, f2: Deleted => B, f3: Modified => B) =
    this match {
      case e @ Created(_)  => f1(e)
      case e @ Deleted(_)  => f2(e)
      case e @ Modified(_) => f3(e)
    }
}

case class Created(override val file: File) extends FileEvent(file)
case class Deleted(override val file: File) extends FileEvent(file)
case class Modified(override val file: File) extends FileEvent(file)

Steam[IO, FileEvent]

We define a stream to provide FileEvent’s based on a filesystem directory path. This follows the steps from https://docs.oracle.com/javase/tutorial/essential/io/notification.html.

val fileListenerStream: String => Stream[IO, FileEvent] = pathString =>
  Stream.bracket(IO(FileSystems.getDefault.newWatchService))(Stream(_), w => IO(w.close()))                        // (1)
    .flatMap { watcher =>
      Stream.eval {
        IO(FileSystems.getDefault.getPath(pathString).register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY)) // (2)
          .map(_ => watcher)
      }
    }
    .flatMap { watcher =>
      def loop: Stream[IO, Seq[FileEvent]] = // (3)
        Stream.eval[IO, Either[Throwable, Option[WatchKey]]](IO(Option(watcher.take)).attempt)
          .flatMap {                         // (4)
            case Right(Some(key)) =>
              val items: Seq[FileEvent] =
                key.pollEvents().asScala
                  .flatMap { event =>
                    if (event.kind == ENTRY_CREATE) {
                      Seq(Created(event.context.asInstanceOf[Path].toFile()))
                    } else if (event.kind == ENTRY_DELETE) {
                      Seq(Deleted(event.context.asInstanceOf[Path].toFile()))
                    } else if (event.kind == ENTRY_MODIFY) {
                      Seq(Modified(event.context.asInstanceOf[Path].toFile()))
                    } else {
                      // Drop OVERFLOW events, should handle these in a real scenario. Note we get these without subscribing
                      // https://docs.oracle.com/javase/7/docs/api/java/nio/file/StandardWatchEventKinds.html#OVERFLOW
                      Seq.empty
                    }
                  }

              if (key.reset) Stream(items) ++ loop else Stream(items)

            case Right(None) =>
              loop

            case Left(e:InterruptedException) =>
              loop

            case Left(e) =>
              Stream.fail(e)
          }

      loop.flatMap { Stream.emits(_) }       // (5)
    }
  1. Stream.bracket is used to lift a java.nio.file.WatchService in to a Stream[IO, WatchService]. The bracket function will call WatchService.close on completion.
  2. We flatMap on the Stream[IO, WatchService] to register the directory to listen to events on. We then return the original WatchService. We use flatMap and Stream.eval with the IO type as this step may raise an exception which will be handled by the underlying IO.
  3. We define a recursive function loop which polls the WatchService for new events using watcher.take. watcher.take will return null if no new events are available, it may throw an InterruptedException or on success return a WatchKey to access the underlying events.
  4. We pattern match on the watcher.take result lifting the events in to a Stream[IO, Seq[FileEvent]]
  5. We flatMap over the loop recursive function to flatten Stream[IO, Seq[FileEvent]]in toStream[IO, FileEvent]`

Example Usage

The below example will print any create, delete or modify event in /var/logs to stdout.

val debugSink = Sink[IO, FileEvent] {
  case Created(file)  => IO(println(s"Created(${file.getAbsolutePath})"))
  case Deleted(file)  => IO(println(s"Deleted(${file.getAbsolutePath})"))
  case Modified(file) => IO(println(s"Modified(${file.getAbsolutePath})"))
}

fileListenerStream("/var/log")
  .through(debugSink)
  .run
  .unsafeRunSync()