Skip to content
etl4s

etl4s

Powerful, whiteboard-style ETL.

import etl4s._

val extract100  = Extract(100)
val half        = Transform[Int, Int](_ / 2)
val double      = Transform[Int, Int](_ * 2)
val consoleLoad = Load[String, Unit](println)
val dbLoad      = Load[String, Unit](s => println(s"saving to db: $s"))

val format = Transform[(Int, Int), String] {
  case (h, d) => s"half=$h, double=$d"
}

val pipeline =
  extract100 ~> (half & double) ~> format ~> (consoleLoad & dbLoad)

pipeline.unsafeRun()
// half=50, double=200
// saving to db: half=50, double=200
import etl4s._

case class DbConfig(host: String, port: Int)

val extract = Extract(List("a", "b", "c"))
val save = Load[List[String], Unit].requires[DbConfig] { db => data =>
  println(s"Saving ${data.size} rows to ${db.host}:${db.port}")
}

val pipeline = extract ~> save

pipeline.provide(DbConfig("localhost", 5432)).unsafeRun(())
// Saving 3 rows to localhost:5432
import etl4s._

val A = Node[String, String](identity)
  .lineage(name = "A", inputs = List("s1", "s2"), outputs = List("s3"))

val B = Node[String, String](identity)
  .lineage(name = "B", inputs = List("s3"), outputs = List("s4", "s5"))

Seq(A, B).toMermaid
graph LR
    classDef pipeline fill:#e1f5fe,stroke:#01579b,stroke-width:2px,color:#000
    classDef dataSource fill:#f3e5f5,stroke:#4a148c,stroke-width:2px,color:#000

    A["A"]
    B["B"]
    s1(["s1"])
    s2(["s2"])
    s3(["s3"])
    s4(["s4"])
    s5(["s5"])

    s1 --> A
    s2 --> A
    A --> s3
    s3 --> B
    B --> s4
    B --> s5

    class A pipeline
    class B pipeline
    class s1,s2,s3,s4,s5 dataSource
import etl4s._

val process = Transform[List[String], Int] { data =>
  Tel.withSpan("processing") {
    Tel.addCounter("items", data.size)
    data.map(_.length).sum
  }
}

// Dev: no-ops (zero cost)
process.unsafeRun(data)

// Prod: plug in your backend
implicit val tel: Etl4sTelemetry = MyOtelProvider()
process.unsafeRun(data)

Single file. Zero dependencies.

A header-file lib (not a framework) that lets you structure code like whiteboard diagrams. Chain with ~>, parallelize with &, inject config with .requires. Works anywhere: scripts, Spark, Flink, your server.

// That's it. One import.
import etl4s._
val pipeline =
extract ~> transform ~> load

Pipelines as values.

Lazy, reified, composable. Pass them around, test them in isolation, generate diagrams from them. Teams share pipelines like Lego bricks. Refactoring is safe because types enforce the boundaries.

(~>) is just *chef's kiss*. There are so many synergies here, haven't pushed for something this hard in a while. — Sr Engineering Manager, Instacart

For engineers & teams.

Write e ~> t ~> l. Types must match or it won't compile. One core abstraction: Node[In, Out]. Structure survives people leaving. New hires read code and get it. Auto-generated diagrams document your pipelines.

E[A, Int]
~>
T[Int, Str]
~>
L[Str, B]
won't compile

Under the hood:

A lightweight effect system with one core Node[-In, +Out] abstraction. The ~> operator infers and merges environments automatically. Details in FAQ.

... (etl4s has) most of the advantages of full blown "effect systems" without the complexities, and awkward monad syntax! — u/RiceBroad4552

Automatic environment inference.

Chain nodes with different dependencies. The compiler gives you config-driven pipelines with typed declarative endpoints for free!

Needs[Db]
~>
Needs[Api]
=
Needs[Db & Api]

Battle-tested at Instacart 🥕


Get started