Skip to content

Telemetry

etl4s provides a minimal telemetry interface. This interface exists to decouple telemetry from specific backends.

How It Works

The Etl4sTelemetry trait defines the core interface:

trait Etl4sTelemetry {
  def withSpan[T](name: String, attributes: (String, Any)*)(block: => T): T
  def addCounter(name: String, value: Long): Unit
  def setGauge(name: String, value: Double): Unit  
  def recordHistogram(name: String, value: Double): Unit
}

All etl4s pipeline run methods automatically look for Etl4sTelemetry in implicit scope.

The Tel object provides a convenient API with identical method names to the trait. By default, all Tel calls are no-ops with zero overhead until you provide an implementation.

Your implementation connects to: OpenTelemetry SDK, Prometheus, DataDog, New Relic, CloudWatch, or whatever you want.

Usage

val process = Transform[List[String], Int] { data =>
  Tel.withSpan("processing") {
    Tel.addCounter("items", data.size)
    data.map(_.length).sum
  }
}

/* Development: no-ops (zero cost) */
process.unsafeRun(data)

/* Production: your backend */
implicit val telemetry: Etl4sTelemetry = MyPrometheusProvider()
process.unsafeRun(data)

Key benefits:

  • Write business critical telemetry in business logic, not infrastructure code
  • Zero performance cost until enabled
  • Works on any platform: local JVM, Spark, Kubernetes, Lambda
  • No framework lock-in or context threading

Why Telemetry (sometimes) belongs in ETL Business Logic

In many web and OLTP programs, telemetry is often a cross-cutting concern separate from business logic. ETL is different. In OLAP processes, observability metrics are frequently business-critical (especially at the peripheries in Extractors and Loaders):

val processUsers = Transform[List[RawUser], List[ValidUser]] { rawUsers =>
  val validated = rawUsers.filter(isValid)
  val invalidCount = rawUsers.size - validated.size

  /* This IS business logic - the business needs these metrics */
  Tel.addCounter("users.processed", rawUsers.size) 
  Tel.addCounter("users.invalid", invalidCount)
  Tel.setGauge("data.quality.ratio", validated.size.toDouble / rawUsers.size)

  /* Business decision based on data quality */
  if (invalidCount > threshold) {
    Tel.addCounter("pipeline.quality.failures", 1)
    throw new DataQualityException("Too many invalid records")
  }

  validated
}

In the above example - these aren't just "monitoring metrics" - they're business KPIs:

  • Record counts determine billing and SLAs
  • Processing times affect customer experience
  • Data quality ratios trigger business alerts
  • Throughput metrics inform capacity planning

etl4s makes it safe to instrument business logic directly because Tel calls are zero-cost no-ops by default. You get the observability where it matters most - in the business context - without infrastructure coupling.

Implementation Examples

OpenTelemetry SDK

class OpenTelemetryProvider extends Etl4sTelemetry {
  private val tracer = GlobalOpenTelemetry.getTracer("my-app")
  private val meter = GlobalOpenTelemetry.getMeter("my-app")

  def withSpan[T](name: String, attributes: (String, Any)*)(block: => T): T = {
    val span = tracer.spanBuilder(name).startSpan()
    try block finally span.end()
  }

  def addCounter(name: String, value: Long): Unit = {
    meter.counterBuilder(name).build().add(value)
  }
  /* ... implement setGauge, recordHistogram */
}

Prometheus

class PrometheusProvider extends Etl4sTelemetry {
  def withSpan[T](name: String, attributes: (String, Any)*)(block: => T): T = {
    val timer = Timer.start()
    try block finally histogram.labels(name).observe(timer.observeDuration())
  }

  def addCounter(name: String, value: Long): Unit = {
    Counter.build().name(name).register().inc(value)
  }
  /* ... implement setGauge, recordHistogram */
}

Console (Built-in)

/* Development telemetry - prints to stdout */
implicit val telemetry: Etl4sTelemetry = Etl4sConsoleTelemetry()

Advanced Features

Span Attributes

Tel.withSpan("processing",
  "input.size" -> data.size,
  "batch.id" -> batchId
) {
  /* processing logic */
}

Nested Spans

Spans automatically nest when called within each other:

Tel.withSpan("outer") {
  val result = Tel.withSpan("inner") {
    /* nested processing */
    computeResult()
  }
  result
}

No-Op by Default

Without an Etl4sTelemetry, all calls are no-ops with zero overhead:

/* No implicit provider - all Tel calls do nothing */
Tel.withSpan("processing") { Tel.addCounter("processed", 1) }

API Reference

Tel Object

Method Description
Tel.withSpan(name)(block) Execute block in named span
Tel.addCounter(name, value) Increment counter
Tel.setGauge(name, value) Set gauge value
Tel.recordHistogram(name, value) Record histogram value

Etl4sTelemetry Interface

Method Description
withSpan(name, attrs*)(block) Create span around block
addCounter(name, value) Record counter increment
setGauge(name, value) Set gauge to value
recordHistogram(name, value) Record histogram measurement

Built-in Implementations

Implementation Description
Etl4sConsoleTelemetry() Prints to stdout
Etl4sNoOpTelemetry Silent no-op (default)