Skip to content

Common Patterns

Chain pipelines

import etl4s._

val A = Pipeline((i: Int) => i.toString)
val B = Pipeline((s: String) => s + "!")

val C = A ~> B  // Int => String

Parallel extraction with .zip

Flatten nested tuples from parallel operations:

val e1 = Extract(1)
val e2 = Extract("two")
val e3 = Extract(3.0)

val combined = (e1 & e2 & e3).zip  // (Int, String, Double) not ((Int, String), Double)

Debugging with .tap

Inspect values mid-pipeline without affecting the flow:

val pipeline = extract
  .tap(data => println(s"Extracted: $data"))
  ~> transform
  .tap(result => println(s"Transformed: $result"))
  ~> load

Sequential side-effects with >>

Run multiple effects in order, same input to each. Only the last result is returned:

val logStart  = Node[String, Unit](s => println(s"Starting: $s"))
val logMiddle = Node[String, Unit](s => println(s"Processing: $s"))
val process   = Node[String, Int](_.length)

val pipeline = logStart >> logMiddle >> process

pipeline.unsafeRun("hello")
// prints: Starting: hello
// prints: Processing: hello
// returns: 5

Useful for setup/teardown:

val clearCache = Node { println("Clearing cache...") }
val warmCache  = Node { println("Warming cache...") }

val pipeline = clearCache >> warmCache >> mainPipeline

Conditional branching

Route data based on conditions:

val classify = Node[Int, Int](identity)
  .If(_ < 0)(Node(_ => "negative"))
  .ElseIf(_ == 0)(Node(_ => "zero"))
  .Else(Node(_ => "positive"))

classify.unsafeRun(-5)  // "negative"
classify.unsafeRun(0)   // "zero"
classify.unsafeRun(10)  // "positive"

Error handling with .onFailure

Provide fallback values:

val risky = Node[String, Int](_.toInt)
  .onFailure(_ => -1)

risky.unsafeRun("42")   // 42
risky.unsafeRun("bad")  // -1

Retry with backoff

val flaky = Node[String, Response](callExternalApi)
  .withRetry(maxAttempts = 3, initialDelayMs = 100, backoffMultiplier = 2.0)

Reactive pipelines with Trace

Branch on upstream errors:

val upstream = Transform[String, Int] { input =>
  if (input.isEmpty) Trace.error("Empty input")
  input.length
}

val downstream = Transform[Int, String] { value =>
  if (Trace.hasErrors) "FALLBACK"
  else s"Length: $value"
}

val pipeline = upstream ~> downstream

pipeline.unsafeRun("")      // "FALLBACK"
pipeline.unsafeRun("hello") // "Length: 5"