Skip to content

Telemetry

When writing ETL jobs, you often need to:

  • Count records processed, track durations, measure data quality
  • Ship metrics to Prometheus, DataDog, or whatever your infra uses
  • Have zero overhead in dev, real metrics in prod

Tel gives you this. It's a thin interface over your metrics backend. No-ops by default, wired up when you provide an implementation.

val process = Transform[List[String], Int] { data =>
  Tel.withSpan("processing") {
    Tel.addCounter("items", data.size)
    data.map(_.length).sum
  }
}

/* Development: no-ops (zero cost) */
process.unsafeRun(data)

/* Production: your backend */
implicit val telemetry: Etl4sTelemetry = MyPrometheusProvider()
process.unsafeRun(data)

The Interface

trait Etl4sTelemetry {
  def withSpan[T](name: String, attributes: (String, Any)*)(block: => T): T
  def addCounter(name: String, value: Long): Unit
  def setGauge(name: String, value: Double): Unit
  def recordHistogram(name: String, value: Double): Unit
}

Your implementation connects to OpenTelemetry SDK, Prometheus, DataDog, New Relic, CloudWatch, or whatever you use.

Why Telemetry in ETL Business Logic

In web apps, telemetry is often a cross-cutting concern. ETL is different. In batch/streaming jobs, metrics are frequently business-critical:

val processUsers = Transform[List[RawUser], List[ValidUser]] { rawUsers =>
  val validated = rawUsers.filter(isValid)
  val invalidCount = rawUsers.size - validated.size

  /* These ARE business metrics */
  Tel.addCounter("users.processed", rawUsers.size)
  Tel.addCounter("users.invalid", invalidCount)
  Tel.setGauge("data.quality.ratio", validated.size.toDouble / rawUsers.size)

  if (invalidCount > threshold) {
    Tel.addCounter("pipeline.quality.failures", 1)
    throw new DataQualityException("Too many invalid records")
  }

  validated
}

These aren't just "monitoring metrics" - they're business KPIs:

  • Record counts determine billing and SLAs
  • Data quality ratios trigger business alerts
  • Throughput metrics inform capacity planning

Implementation Examples

OpenTelemetry SDK

class OpenTelemetryProvider extends Etl4sTelemetry {
  private val tracer = GlobalOpenTelemetry.getTracer("my-app")
  private val meter = GlobalOpenTelemetry.getMeter("my-app")

  def withSpan[T](name: String, attributes: (String, Any)*)(block: => T): T = {
    val span = tracer.spanBuilder(name).startSpan()
    try block finally span.end()
  }

  def addCounter(name: String, value: Long): Unit = {
    meter.counterBuilder(name).build().add(value)
  }
  /* ... implement setGauge, recordHistogram */
}

Prometheus

class PrometheusProvider extends Etl4sTelemetry {
  def withSpan[T](name: String, attributes: (String, Any)*)(block: => T): T = {
    val timer = Timer.start()
    try block finally histogram.labels(name).observe(timer.observeDuration())
  }

  def addCounter(name: String, value: Long): Unit = {
    Counter.build().name(name).register().inc(value)
  }
  /* ... implement setGauge, recordHistogram */
}

Console (Built-in)

/* Development telemetry - prints to stdout */
implicit val telemetry: Etl4sTelemetry = Etl4sConsoleTelemetry()

Nested Spans

Spans automatically nest:

Tel.withSpan("outer") {
  Tel.withSpan("inner") {
    computeResult()
  }
}

Span Attributes

Tel.withSpan("processing",
  "input.size" -> data.size,
  "batch.id" -> batchId
) {
  /* processing logic */
}

API Reference

Tel Object

Method Description
Tel.withSpan(name)(block) Execute block in named span
Tel.addCounter(name, value) Increment counter
Tel.setGauge(name, value) Set gauge value
Tel.recordHistogram(name, value) Record histogram value

Built-in Implementations

Implementation Description
Etl4sConsoleTelemetry() Prints to stdout
Etl4sNoOpTelemetry Silent no-op (default)