diff --git a/aws-terraform/main.tf b/aws-terraform/main.tf index a8dcff9..5c1df4f 100644 --- a/aws-terraform/main.tf +++ b/aws-terraform/main.tf @@ -172,6 +172,25 @@ resource "aws_iam_role_policy" "flink_app_metrics_policy" { }) } +resource "aws_iam_role_policy" "flink_app_secrets_manager_policy" { + name = "SecretsManagerAccessPolicy" + role = aws_iam_role.flink_application_role.id + policy = jsonencode({ + Version = "2012-10-17", + Statement = [ + { + Action = [ + "secretsmanager:GetSecretValue", + "secretsmanager:DescribeSecret" + ], + Effect = "Allow", + Resource = "*" + } + ] + }) +} + + # Reference: https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/kinesisanalyticsv2_application resource "aws_kinesisanalyticsv2_application" "flink_demo_tf" { name = "flink-tf-demo-application" @@ -203,6 +222,13 @@ resource "aws_kinesisanalyticsv2_application" "flink_demo_tf" { EVENTS_INGRESS_STREAM_DEFAULT = "${aws_kinesis_stream.flink_demo_ingress.name}" EVENTS_EGRESS_STREAM_DEFAULT = "${aws_kinesis_stream.flink_demo_egress.name}" AWS_REGION = data.aws_region.current.name + ENVIRONMENT = "demo" + NAMESPACE = "sandbox-demo" + OTEL_SDK_DISABLED: "false" + OTEL_SERVICE_NAME: "sandbox-statefun" + OTEL_EXPORTER_OTLP_ENDPOINT = "https://otlp.nr-data.net:443" + OTEL_EXPORTER_OTLP_PROTOCOL: "http/protobuf" + OTEL_EXPORTER_OTLP_HEADERS = "!secret:OTEL_EXPORTER_OTLP_HEADERS" } } } @@ -216,7 +242,7 @@ resource "aws_kinesisanalyticsv2_application" "flink_demo_tf" { monitoring_configuration { configuration_type = "CUSTOM" log_level = "INFO" - metrics_level = "TASK" + metrics_level = "APPLICATION" } parallelism_configuration { auto_scaling_enabled = false diff --git a/pom.xml b/pom.xml index 5d820a5..16beac3 100644 --- a/pom.xml +++ b/pom.xml @@ -14,10 +14,11 @@ Stateful Functions Embedded Java Example - UTF-8 - 3.3-1.18 + UTF-8 + + 3.3.0.1-1.18 1.18.1 - 3.7.1 + 3.25.6 11 ${java.version} ${java.version} @@ -102,6 +103,10 @@ aws-kinesisanalytics-runtime ${kda.runtime.version} + + software.amazon.awssdk + secretsmanager + @@ -143,6 +148,23 @@ 2.12.1 + + + io.opentelemetry + opentelemetry-sdk + 1.49.0 + + + io.opentelemetry + opentelemetry-sdk-metrics + 1.49.0 + + + io.opentelemetry + opentelemetry-exporter-otlp + 1.49.0 + + ch.qos.logback logback-classic diff --git a/src/main/java/com/example/stateful_functions/Configuration.java b/src/main/java/com/example/stateful_functions/Configuration.java index 5984abb..4a30036 100644 --- a/src/main/java/com/example/stateful_functions/Configuration.java +++ b/src/main/java/com/example/stateful_functions/Configuration.java @@ -4,37 +4,62 @@ import org.apache.flink.statefun.sdk.kinesis.auth.AwsRegion; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.secretsmanager.SecretsManagerClient; +import software.amazon.awssdk.services.secretsmanager.model.GetSecretValueRequest; import java.lang.reflect.Field; -import java.util.Locale; -import java.util.Objects; -import java.util.Optional; -import java.util.Properties; +import java.util.*; public class Configuration { + private static Logger LOG = LoggerFactory.getLogger(Configuration.class); public static Properties properties = getProperties(); + public static final String ENVIRONMENT = properties.getOrDefault("ENVIRONMENT", "local").toString(); + public static String NAMESPACE = properties.getOrDefault("NAMESPACE", "jetstream-local").toString(); + + public static String AWS_REGION = properties.getOrDefault("AWS_REGION", "us-east-1").toString(); + public static boolean OTEL_SDK_DISABLED = properties.getOrDefault("OTEL_SDK_DISABLED", "true").equals("true"); + public static String OTEL_SERVICE_NAME = properties.getOrDefault("OTEL_SERVICE_NAME", "jetstream-statefun").toString(); + public static String OTEL_EXPORTER_OTLP_ENDPOINT = properties.getOrDefault("OTEL_EXPORTER_OTLP_ENDPOINT", "").toString(); + public static String OTEL_EXPORTER_OTLP_HEADERS = properties.getOrDefault("OTEL_EXPORTER_OTLP_HEADERS", "").toString(); + public static String OTEL_EXPORTER_OTLP_PROTOCOL = properties.getOrDefault("OTEL_EXPORTER_OTLP_PROTOCOL", "").toString(); + public static String OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE = + properties.getOrDefault("OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE","Delta").toString(); + public static long OTEL_METRIC_EXPORT_INTERVAL = + Long.parseLong(properties.getOrDefault("OTEL_METRIC_EXPORT_INTERVAL", "60000").toString()); + public static long OTEL_METRIC_EXPORT_TIMEOUT = + Long.parseLong(properties.getOrDefault("OTEL_METRIC_EXPORT_TIMEOUT", "30000").toString()); + + public static String INGRESS_KINESIS_STREAM_NAME = properties.getOrDefault("EVENTS_INGRESS_STREAM_DEFAULT", "example-ingress-stream").toString(); public static String EGRESS_KINESIS_STREAM_NAME = properties.getOrDefault("EVENTS_EGRESS_STREAM_DEFAULT", "example-egress-stream").toString(); public static boolean IS_LOCAL_DEV = properties.getOrDefault("IS_LOCAL_DEV", "false").equals("true"); + + public static boolean USE_ENHANCED_FANOUT = properties.getOrDefault("USE_ENHANCED_FANOUT", "true").equals("true"); public static String ENHANCED_FANOUT_NAME = properties.getOrDefault("ENHANCED_FANOUT_NAME", "example-enhanced-fanout").toString(); public static String APP_VERSION = properties.getOrDefault("app.version", "0.1").toString(); + public static final AwsRegion getAwsRegion() { + return getAwsRegion(properties); + } - if (properties.containsKey("AWS_REGION") && !properties.containsKey("AWS_ENDPOINT")){ - return AwsRegion.ofId(properties.get("AWS_REGION").toString()); + private static AwsRegion getAwsRegion(Properties _properties) { + + if (_properties.containsKey("AWS_REGION") && !_properties.containsKey("AWS_ENDPOINT")){ + return AwsRegion.ofId(_properties.get("AWS_REGION").toString()); } else { AwsRegion region = AwsRegion.ofCustomEndpoint( - properties.getOrDefault("AWS_ENDPOINT", "https://localhost:4566").toString(), - properties.getOrDefault("AWS_REGION", "us-east-1").toString() + _properties.getOrDefault("AWS_ENDPOINT", "https://localhost:4566").toString(), + _properties.getOrDefault("AWS_REGION", "us-east-1").toString() ); try { @@ -45,10 +70,10 @@ public static final AwsRegion getAwsRegion() { { field.set(region, "http://host.docker.internal:4566"); } - } catch (Exception ex) { + } + catch (Exception ex) { } return region; - } } @@ -76,7 +101,58 @@ private static Properties getProperties() { catch (Exception x) { LOG.warn(x.getMessage(), x); } - return properties; + + return resolveSecretValueReferences(properties); } + // Replace property values of the form "!secret:" with the actual secret value from AWS Secrets Manager + private static Properties resolveSecretValueReferences(Properties properties) { + + // Find property key/value pairs where the value has a secret reference + Map propertiesWithSecretsResolved = new HashMap<>(); + properties.stringPropertyNames().forEach(propertyName -> { + String propertyValue = properties.getProperty(propertyName); + if (propertyValue.startsWith("!secret:")) { + String secretName = propertyValue.substring(8); + propertiesWithSecretsResolved.put(propertyName, secretName); + } + }); + + // Nothing to do if there are no secrets to resolve + if (propertiesWithSecretsResolved.isEmpty()) { + return properties; + } + + String awsRegionId = properties.get("AWS_REGION").toString(); + LOG.info("Resolving configuration secrets using AWS_REGION={}", awsRegionId); + + // Create a Secrets Manager client + try (SecretsManagerClient client = SecretsManagerClient.builder() + .region(Region.of(awsRegionId)) + .build()) { + + // For each property with a secret reference, get the secret value from AWS Secrets Manager and replace + // the value in the properties map + propertiesWithSecretsResolved.entrySet().forEach(entry -> { + String propertyName = entry.getKey(); + String secretName = entry.getKey(); + + GetSecretValueRequest getSecretValueRequest = GetSecretValueRequest.builder() + .secretId(secretName) + .build(); + + try { + properties.put(propertyName, client.getSecretValue(getSecretValueRequest).secretString()); + } + catch (Exception e) { + LOG.warn("Failed to get secret value for {}", secretName, e); + } + }); + } + catch (Exception e) { + LOG.warn("Failed init SecretsManagerClient: ", e); + } + + return properties; + } } diff --git a/src/main/java/com/example/stateful_functions/Main.java b/src/main/java/com/example/stateful_functions/Main.java index da6c1ab..820aadf 100644 --- a/src/main/java/com/example/stateful_functions/Main.java +++ b/src/main/java/com/example/stateful_functions/Main.java @@ -13,6 +13,11 @@ public static void main(String... args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StatefulFunctionsConfig stateFunConfig = StatefulFunctionsConfig.fromEnvironment(env); + + stateFunConfig.setMetricFunctionNamespaceKey("ns"); + stateFunConfig.setMetricFunctionTypeKey("type"); + stateFunConfig.setMetricGroupAdapterFactoryClassName("com.example.stateful_functions.metrics.JetstreamMetricGroupAdapterFactory"); + stateFunConfig.setProvider((StatefulFunctionsUniverseProvider) (classLoader, statefulFunctionsConfig) -> { Modules modules = Modules.loadFromClassPath(stateFunConfig); return modules.createStatefulFunctionsUniverse(); diff --git a/src/main/java/com/example/stateful_functions/function/product/ProductStatefulFunction.java b/src/main/java/com/example/stateful_functions/function/product/ProductStatefulFunction.java index 7c0a947..a050475 100644 --- a/src/main/java/com/example/stateful_functions/function/product/ProductStatefulFunction.java +++ b/src/main/java/com/example/stateful_functions/function/product/ProductStatefulFunction.java @@ -101,11 +101,13 @@ private void handleFunctionSubscriptionEvent(Context context, CloudEvent subscri FunctionSubscriptionDetails subscriptionDetails = cloudEventDataAccess.toFunctionSubscriptionDetails(subscriptionEvent); FunctionSubscriber subscriber = FunctionSubscriberUtil.subscriberFromSubscription(subscriptionDetails); if (subscriptionDetails.getAction() == FunctionSubscriptionAction.UNSUBSCRIBE) { + context.metrics().counter("unsubscribed").inc(); subscribers.remove(subscriber.getSubscriberId()); return; } if (subscriptionDetails.getAction() == FunctionSubscriptionAction.SUBSCRIBE) { + context.metrics().counter("subscribed").inc(); subscribers.set(subscriber.getSubscriberId(), subscriber); } @@ -113,6 +115,7 @@ private void handleFunctionSubscriptionEvent(Context context, CloudEvent subscri } private void notifySubscriber(Context context, FunctionSubscriber subscriber, CloudEvent productEvent) { + context.metrics().counter("notify_subscriber").inc(); send(context, new FunctionType(subscriber.getNamespace(), subscriber.getType()), subscriber.getId(), productEvent); } diff --git a/src/main/java/com/example/stateful_functions/metrics/JetstreamMetricCounter.java b/src/main/java/com/example/stateful_functions/metrics/JetstreamMetricCounter.java new file mode 100644 index 0000000..16c61c4 --- /dev/null +++ b/src/main/java/com/example/stateful_functions/metrics/JetstreamMetricCounter.java @@ -0,0 +1,49 @@ +package com.example.stateful_functions.metrics; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.LongGauge; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.SimpleCounter; + +public class JetstreamMetricCounter implements Counter { + + private final Counter flinkCounter = new SimpleCounter(); + + private final LongGauge otelGauge; + private final Attributes otelAttributes; + + public JetstreamMetricCounter(LongGauge otelGauge, Attributes otelAttributes) { + this.otelGauge = otelGauge; + this.otelAttributes = otelAttributes; + } + + @Override + public void inc() { + flinkCounter.inc(); + otelGauge.set(flinkCounter.getCount(), otelAttributes); + } + + @Override + public void inc(long l) { + otelGauge.set(flinkCounter.getCount() + l, otelAttributes); + flinkCounter.inc(l); + + } + + @Override + public void dec() { + flinkCounter.dec(); + otelGauge.set(flinkCounter.getCount(), otelAttributes); + } + + @Override + public void dec(long l) { + otelGauge.set(flinkCounter.getCount() - l, otelAttributes); + flinkCounter.dec(l); + } + + @Override + public long getCount() { + return flinkCounter.getCount(); + } +} diff --git a/src/main/java/com/example/stateful_functions/metrics/JetstreamMetricGroupAdapter.java b/src/main/java/com/example/stateful_functions/metrics/JetstreamMetricGroupAdapter.java new file mode 100644 index 0000000..5759d4a --- /dev/null +++ b/src/main/java/com/example/stateful_functions/metrics/JetstreamMetricGroupAdapter.java @@ -0,0 +1,123 @@ +package com.example.stateful_functions.metrics; + +import com.example.stateful_functions.Configuration; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.api.metrics.LongGauge; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.metrics.*; +import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup; + +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Adapter for Flink's MetricGroup which creates wrappers around Counters that also report via OpenTelemetry. + */ +public class JetstreamMetricGroupAdapter implements MetricGroup { + + private final MetricGroup adapted; + private final OpenTelemetrySdk openTelemetrySdk; + + private static final char METRIC_SCOPE_DELIMITER = '.'; + + // Exclude this list of attributes from the OpenTelemetry metrics + private static final Set EXCLUDE_ATTRIBUTES = Set.of( + "host", // ="10.89.0.23" + "job_id", // ="279f294c8e2048d912878c11fd1f1348" + "operator_id", // ="6b87a4870d0e21cecbbe271bd893cfcc" + // "operator_name", // ="functions" + "subtask_index", // ="0" + "task_attempt_id", // ="7dfc52028fab716222444f4f29d14905" + "task_attempt_num", // ="0" + "task_id", // ="31284d56d1e2112b0f20099ee448a6a9" + "task_name", // ="feedback-union -> functions -> Sink: rad-output-egress-egress" + "tm_id" // ="10.89.0.23:35601-565f93" + ); + + public JetstreamMetricGroupAdapter(MetricGroup adapted, OpenTelemetrySdk openTelemetrySdk) { + this.adapted = adapted; + this.openTelemetrySdk = openTelemetrySdk; + } + + @Override + public Counter counter(String name) { + + final String flinkScope; + final Map variables; + if (adapted instanceof AbstractMetricGroup) { + AbstractMetricGroup abstractMetricGroup = (AbstractMetricGroup) adapted; + flinkScope = abstractMetricGroup.getLogicalScope(CharacterFilter.NO_OP_FILTER, METRIC_SCOPE_DELIMITER); + variables = adapted.getAllVariables().entrySet().stream() + .map(entry -> Map.entry(StringUtils.removeEnd(StringUtils.removeStart(entry.getKey(), "<"),">"), entry.getValue())) + .filter(entry -> !EXCLUDE_ATTRIBUTES.contains(entry.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + else { + flinkScope = "unresolvable.scope"; + variables = Map.of("adaptedClass", adapted.getClass().getName()); + } + LongGauge otelGauge = openTelemetrySdk.getMeter(OpenTelemetrySdkUtil.getServiceName()) + .gaugeBuilder(flinkScope + METRIC_SCOPE_DELIMITER + name) + .ofLongs() + .build(); + AttributesBuilder attributesBuilder = Attributes.builder(); + variables.forEach(attributesBuilder::put); + attributesBuilder.put("service.namespace", Configuration.NAMESPACE); + + return counter(name, new JetstreamMetricCounter(otelGauge, attributesBuilder.build())); + } + + @Override + public C counter(String name, C counter) { + return adapted.counter(name, counter); + } + + @Override + public > G gauge(String name, G g) { + return adapted.gauge(name, g); + } + + @Override + public H histogram(String name, H h) { + return adapted.histogram(name, h); + } + + @Override + public M meter(String name, M m) { + return adapted.meter(name, m); + } + + @Override + public MetricGroup addGroup(String name) { + return adapted.addGroup(name); + } + + @Override + public MetricGroup addGroup(String key, String value) { + return adapted.addGroup(key, value); + } + + @Override + public String[] getScopeComponents() { + return adapted.getScopeComponents(); + } + + @Override + public Map getAllVariables() { + return adapted.getAllVariables(); + } + + @Override + public String getMetricIdentifier(String metricName) { + return adapted.getMetricIdentifier(metricName); + } + + @Override + public String getMetricIdentifier(String metricName, CharacterFilter characterFilter) { + return adapted.getMetricIdentifier(metricName, characterFilter); + } +} diff --git a/src/main/java/com/example/stateful_functions/metrics/JetstreamMetricGroupAdapterFactory.java b/src/main/java/com/example/stateful_functions/metrics/JetstreamMetricGroupAdapterFactory.java new file mode 100644 index 0000000..fff819d --- /dev/null +++ b/src/main/java/com/example/stateful_functions/metrics/JetstreamMetricGroupAdapterFactory.java @@ -0,0 +1,34 @@ +package com.example.stateful_functions.metrics; + +import io.opentelemetry.sdk.OpenTelemetrySdk; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.statefun.flink.core.metrics.MetricGroupAdapterFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class JetstreamMetricGroupAdapterFactory implements MetricGroupAdapterFactory { + + private static final Logger LOG = LoggerFactory.getLogger(JetstreamMetricGroupAdapterFactory.class); + + @Override + public MetricGroup createMetricGroupAdapter(MetricGroup metricGroup) { + try { + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + if (classLoader == null) { + classLoader = JetstreamMetricGroupAdapterFactory.class.getClassLoader(); + } + Class sdkUtilClass = classLoader.loadClass("com.example.stateful_functions.metrics.OpenTelemetrySdkUtil"); + Object sdk = sdkUtilClass.getMethod("getSdk").invoke(null); + if (sdk instanceof OpenTelemetrySdk) { + LOG.warn("Creating JetstreamMetricGroupAdapter with OpenTelemetry SDK: {}@{}", sdk.getClass().getName(), sdk.hashCode()); + return new JetstreamMetricGroupAdapter(metricGroup, (OpenTelemetrySdk) sdk); + } + } catch (Exception x) { + LOG.error(x.getMessage(), x); + LOG.warn("Returning original MetricGroup due to errors resolving SDK"); + } + + LOG.warn("OpenTelemetry SDK is disabled. Returning original MetricGroup."); + return metricGroup; + } +} diff --git a/src/main/java/com/example/stateful_functions/metrics/OpenTelemetrySdkUtil.java b/src/main/java/com/example/stateful_functions/metrics/OpenTelemetrySdkUtil.java new file mode 100644 index 0000000..e6081b4 --- /dev/null +++ b/src/main/java/com/example/stateful_functions/metrics/OpenTelemetrySdkUtil.java @@ -0,0 +1,173 @@ +package com.example.stateful_functions.metrics; + +import com.example.stateful_functions.Configuration; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; +import io.opentelemetry.context.propagation.ContextPropagators; +import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporter; +import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporterBuilder; +import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.metrics.export.AggregationTemporalitySelector; +import io.opentelemetry.sdk.metrics.export.MetricExporter; +import io.opentelemetry.sdk.metrics.export.MetricReader; +import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader; +import io.opentelemetry.sdk.resources.Resource; +import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.common.protocol.types.Field; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.*; + +public class OpenTelemetrySdkUtil { + + private static final Logger LOG = LoggerFactory.getLogger(OpenTelemetrySdkUtil.class); + + private static boolean initialized = false; + private static OpenTelemetrySdk sdk = null; + + private static OpenTelemetrySdk initialized(OpenTelemetrySdk _sdk) { + initialized = true; + sdk = _sdk; + return sdk; + } + + public static String getServiceName() { + return Configuration.OTEL_SERVICE_NAME + '-' + Configuration.ENVIRONMENT; + } + + public static synchronized OpenTelemetrySdk getSdk() { + if (initialized) { + return sdk; + } + + // In the logging output, obfuscate headers values where 'key' is in the header name + String headersToLog; + if (StringUtils.isNotEmpty(Configuration.OTEL_EXPORTER_OTLP_HEADERS)) { + List items = new ArrayList<>(); + String[] headerArray = Configuration.OTEL_EXPORTER_OTLP_HEADERS.split(","); + for (String header : headerArray) { + String[] keyValue = header.split("="); + if (keyValue.length == 2) { + if (keyValue[0].contains("key")) { + items.add(String.format("%s=%s", keyValue[0], + new String(Base64.getEncoder().encode(keyValue[1].getBytes(StandardCharsets.UTF_8)), StandardCharsets.UTF_8))); + } + else { + items.add(header); + } + } + else { + items.add(header); + } + } + headersToLog = StringUtils.joinWith(",", items); + } + else { + headersToLog = Configuration.OTEL_EXPORTER_OTLP_HEADERS; + } + + LOG.warn("Initializing with OTEL_SDK_DISABLED={}, OTEL_EXPORTER_OTLP_PROTOCOL='{}', OTEL_EXPORTER_OTLP_ENDPOINT='{}', OTEL_EXPORTER_OTLP_HEADERS='{}'", + Configuration.OTEL_SDK_DISABLED, + Configuration.OTEL_EXPORTER_OTLP_PROTOCOL, + Configuration.OTEL_EXPORTER_OTLP_ENDPOINT, + headersToLog); + + if (Configuration.OTEL_SDK_DISABLED) { + LOG.warn("OpenTelemetry SDK is disabled. Skipping initialization."); + return initialized(null); + } + + if (StringUtils.isEmpty(Configuration.OTEL_EXPORTER_OTLP_PROTOCOL)) { + LOG.warn("OTLP protocol is not set. Skipping initialization."); + return initialized(null); + } + + if (StringUtils.isEmpty(Configuration.OTEL_EXPORTER_OTLP_ENDPOINT)) { + LOG.warn("OTLP endpoint is not set. Skipping initialization."); + return initialized(null); + } + + Resource serviceNameResourceResource = Resource.create(Attributes.of(AttributeKey.stringKey("service.name"), getServiceName())); + + Map headers = new HashMap<>(); + + if (StringUtils.isNotEmpty(Configuration.OTEL_EXPORTER_OTLP_HEADERS)) { + String[] headerArray = Configuration.OTEL_EXPORTER_OTLP_HEADERS.split(","); + for (String header : headerArray) { + String[] keyValue = header.split("="); + if (keyValue.length == 2) { + headers.put(keyValue[0], keyValue[1]); + } + else { + LOG.warn("Header value '{}' parsed to {} token(s). Expected exactly 2", header, keyValue.length); + } + } + } + + final AggregationTemporalitySelector aggregationTemporalitySelector; + switch (Configuration.OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE.toUpperCase()) { + case "DELTA": + aggregationTemporalitySelector = AggregationTemporalitySelector.deltaPreferred(); + break; + case "CUMULATIVE": + aggregationTemporalitySelector = AggregationTemporalitySelector.alwaysCumulative(); + break; + case "LOWMEMORY": + aggregationTemporalitySelector = AggregationTemporalitySelector.lowMemory(); + break; + default: + LOG.warn("Unsupported OTLP metrics temporality preference: '{}'. Defaulting to DELTA.", Configuration.OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE); + aggregationTemporalitySelector = AggregationTemporalitySelector.deltaPreferred(); + break; + } + + final MetricExporter metricExporter; + final String metricEndpoint = Configuration.OTEL_EXPORTER_OTLP_ENDPOINT + "/v1/metrics"; + switch (Configuration.OTEL_EXPORTER_OTLP_PROTOCOL) { + case "http/protobuf": + LOG.info("Using OTLP HTTP/Protobuf"); + OtlpHttpMetricExporterBuilder builder = OtlpHttpMetricExporter.builder() + .setHeaders(() -> headers) + .setEndpoint(metricEndpoint) + .setTimeout(Duration.ofMillis(Configuration.OTEL_METRIC_EXPORT_TIMEOUT)) + .setAggregationTemporalitySelector(aggregationTemporalitySelector); + metricExporter = builder.build(); + break; + case "grpc": + LOG.info("Using OTLP gRPC"); + metricExporter = OtlpGrpcMetricExporter.builder() + .setHeaders(() -> headers) + .setEndpoint(metricEndpoint) + .setTimeout(Duration.ofMillis(Configuration.OTEL_METRIC_EXPORT_TIMEOUT)) + .setAggregationTemporalitySelector(aggregationTemporalitySelector) + .build(); + break; + default: + LOG.warn("Unsupported OTLP protocol: '{}'.", Configuration.OTEL_EXPORTER_OTLP_PROTOCOL); + return initialized(null); + } + MetricReader periodicReader = + PeriodicMetricReader.builder(metricExporter) + .setInterval(Duration.ofMillis(Configuration.OTEL_METRIC_EXPORT_INTERVAL)) + .build(); + + SdkMeterProvider sdkMeterProvider = SdkMeterProvider.builder() + .addResource(serviceNameResourceResource) + .registerMetricReader(periodicReader) + .build(); + + Runtime.getRuntime().addShutdownHook(new Thread(sdkMeterProvider::close)); + sdk = + OpenTelemetrySdk.builder() + .setMeterProvider(sdkMeterProvider) + .setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance())) + .build(); + return initialized(sdk); + } +}