publicInstrumentProvider(OpenTelemetry otel) { if (otel == null) { // By default, metrics are disabled, unless the OTel java agent is configured. // This allows to enable metrics without any code change. otel = GlobalOpenTelemetry.get(); } this.meter = otel.getMeterProvider() .meterBuilder("org.apache.pulsar.client") .setInstrumentationVersion(PulsarVersion.getVersion()) .build(); }
messageInCounter = meter .counterBuilder(MESSAGE_IN_COUNTER) .setUnit("{message}") .setDescription("The total number of messages received for this topic.") .buildObserver();
subscriptionCounter = meter .upDownCounterBuilder(SUBSCRIPTION_COUNTER) .setUnit("{subscription}") .setDescription("The number of Pulsar subscriptions of the topic served by this broker.") .buildObserver();
DoubleHistogramBuilderbuilder= meter.histogramBuilder("pulsar.client.producer.message.send.duration") .setDescription("Publish latency experienced by the application, includes client batching time") .setUnit(Unit.Seconds.toString()) .setExplicitBucketBoundariesAdvice(latencyHistogramBuckets);
这是一个记录 Pulsar producer 发送延迟的指标,类型是 Histogram。
1 2 3 4 5 6
backlogQuotaAge = meter .gaugeBuilder(BACKLOG_QUOTA_AGE) .ofLongs() .setUnit("s") .setDescription("The age of the oldest unacknowledged message (backlog).") .buildObserver();
publicstaticvoidregisterObservers() { Metermeter= MetricsRegistration.getMeter(); meter.gaugeBuilder("pulsar_producer_num_msg_send") .setDescription("The number of messages published in the last interval") .ofLongs() .buildWithCallback( r -> recordProducerMetrics(r, ProducerStats::getNumMsgsSent));