Config-Driven Pipelines¶
Make your pipelines configurable and reusable. Declare what each step needs, then provide config once.
import etl4s._
case class ApiConfig(url: String, key: String)
val extract = Extract("user123")
val enrich = Transform[String, String].requires[ApiConfig] { cfg => user =>
s"${cfg.key}: $user"
}
val pipeline = extract ~> enrich
// Provide config and run
pipeline.provide(ApiConfig("api.com", "SECRET")).unsafeRun(())
Config Inheritance & Propagation¶
Build modular configs with traits. etl4s automatically infers what your pipeline needs and propagates config through the entire pipeline:
trait HasDb { def dbUrl: String }
trait HasAuth { def apiKey: String }
val saveData = Load[String, Unit].requires[HasDb] { cfg => data =>
println(s"Saving to ${cfg.dbUrl}")
}
val fetchUser = Extract[String, User].requires[HasAuth] { cfg => id =>
fetchFromApi(cfg.apiKey, id)
}
// Combined config
case class AppConfig(dbUrl: String, apiKey: String) extends HasDb with HasAuth
val pipeline = fetchUser ~> process ~> saveData
// Config flows to all steps that need it automatically
pipeline.provide(AppConfig("jdbc:pg", "secret")).unsafeRun("user123")
Context¶
Context[T]
is a trait that provides organized factory methods for config-driven operations. For larger applications, extend it to keep your context-aware pipelines organized:
case class DbConfig(url: String, timeout: Int)
object DataPipeline extends Context[DbConfig] {
val fetch = Context.Extract[String, List[User]] { cfg => query =>
connectAndFetch(cfg.url, query)
}
val save = Context.Load[List[User], Unit] { cfg => users =>
Database.connect(cfg.url).save(users)
}
val pipeline = fetch ~> save
}
// Provide config once
DataPipeline.pipeline.provide(DbConfig("jdbc:pg", 5000)).unsafeRun("query")
Scala 2.x Note¶
Use explicit types for better inference: