diff --git a/aws-terraform/main.tf b/aws-terraform/main.tf
index a8dcff9..5c1df4f 100644
--- a/aws-terraform/main.tf
+++ b/aws-terraform/main.tf
@@ -172,6 +172,25 @@ resource "aws_iam_role_policy" "flink_app_metrics_policy" {
})
}
+resource "aws_iam_role_policy" "flink_app_secrets_manager_policy" {
+ name = "SecretsManagerAccessPolicy"
+ role = aws_iam_role.flink_application_role.id
+ policy = jsonencode({
+ Version = "2012-10-17",
+ Statement = [
+ {
+ Action = [
+ "secretsmanager:GetSecretValue",
+ "secretsmanager:DescribeSecret"
+ ],
+ Effect = "Allow",
+ Resource = "*"
+ }
+ ]
+ })
+}
+
+
# Reference: https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/kinesisanalyticsv2_application
resource "aws_kinesisanalyticsv2_application" "flink_demo_tf" {
name = "flink-tf-demo-application"
@@ -203,6 +222,13 @@ resource "aws_kinesisanalyticsv2_application" "flink_demo_tf" {
EVENTS_INGRESS_STREAM_DEFAULT = "${aws_kinesis_stream.flink_demo_ingress.name}"
EVENTS_EGRESS_STREAM_DEFAULT = "${aws_kinesis_stream.flink_demo_egress.name}"
AWS_REGION = data.aws_region.current.name
+ ENVIRONMENT = "demo"
+ NAMESPACE = "sandbox-demo"
+ OTEL_SDK_DISABLED: "false"
+ OTEL_SERVICE_NAME: "sandbox-statefun"
+ OTEL_EXPORTER_OTLP_ENDPOINT = "https://otlp.nr-data.net:443"
+ OTEL_EXPORTER_OTLP_PROTOCOL: "http/protobuf"
+ OTEL_EXPORTER_OTLP_HEADERS = "!secret:OTEL_EXPORTER_OTLP_HEADERS"
}
}
}
@@ -216,7 +242,7 @@ resource "aws_kinesisanalyticsv2_application" "flink_demo_tf" {
monitoring_configuration {
configuration_type = "CUSTOM"
log_level = "INFO"
- metrics_level = "TASK"
+ metrics_level = "APPLICATION"
}
parallelism_configuration {
auto_scaling_enabled = false
diff --git a/pom.xml b/pom.xml
index 5d820a5..16beac3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -14,10 +14,11 @@
Stateful Functions Embedded Java Example
- UTF-8
- 3.3-1.18
+ UTF-8
+
+ 3.3.0.1-1.18
1.18.1
- 3.7.1
+ 3.25.6
11
${java.version}
${java.version}
@@ -102,6 +103,10 @@
aws-kinesisanalytics-runtime
${kda.runtime.version}
+
+ software.amazon.awssdk
+ secretsmanager
+
@@ -143,6 +148,23 @@
2.12.1
+
+
+ io.opentelemetry
+ opentelemetry-sdk
+ 1.49.0
+
+
+ io.opentelemetry
+ opentelemetry-sdk-metrics
+ 1.49.0
+
+
+ io.opentelemetry
+ opentelemetry-exporter-otlp
+ 1.49.0
+
+
ch.qos.logback
logback-classic
diff --git a/src/main/java/com/example/stateful_functions/Configuration.java b/src/main/java/com/example/stateful_functions/Configuration.java
index 5984abb..4a30036 100644
--- a/src/main/java/com/example/stateful_functions/Configuration.java
+++ b/src/main/java/com/example/stateful_functions/Configuration.java
@@ -4,37 +4,62 @@
import org.apache.flink.statefun.sdk.kinesis.auth.AwsRegion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.secretsmanager.SecretsManagerClient;
+import software.amazon.awssdk.services.secretsmanager.model.GetSecretValueRequest;
import java.lang.reflect.Field;
-import java.util.Locale;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Properties;
+import java.util.*;
public class Configuration {
+
private static Logger LOG = LoggerFactory.getLogger(Configuration.class);
public static Properties properties = getProperties();
+ public static final String ENVIRONMENT = properties.getOrDefault("ENVIRONMENT", "local").toString();
+ public static String NAMESPACE = properties.getOrDefault("NAMESPACE", "jetstream-local").toString();
+
+ public static String AWS_REGION = properties.getOrDefault("AWS_REGION", "us-east-1").toString();
+ public static boolean OTEL_SDK_DISABLED = properties.getOrDefault("OTEL_SDK_DISABLED", "true").equals("true");
+ public static String OTEL_SERVICE_NAME = properties.getOrDefault("OTEL_SERVICE_NAME", "jetstream-statefun").toString();
+ public static String OTEL_EXPORTER_OTLP_ENDPOINT = properties.getOrDefault("OTEL_EXPORTER_OTLP_ENDPOINT", "").toString();
+ public static String OTEL_EXPORTER_OTLP_HEADERS = properties.getOrDefault("OTEL_EXPORTER_OTLP_HEADERS", "").toString();
+ public static String OTEL_EXPORTER_OTLP_PROTOCOL = properties.getOrDefault("OTEL_EXPORTER_OTLP_PROTOCOL", "").toString();
+ public static String OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE =
+ properties.getOrDefault("OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE","Delta").toString();
+ public static long OTEL_METRIC_EXPORT_INTERVAL =
+ Long.parseLong(properties.getOrDefault("OTEL_METRIC_EXPORT_INTERVAL", "60000").toString());
+ public static long OTEL_METRIC_EXPORT_TIMEOUT =
+ Long.parseLong(properties.getOrDefault("OTEL_METRIC_EXPORT_TIMEOUT", "30000").toString());
+
+
public static String INGRESS_KINESIS_STREAM_NAME = properties.getOrDefault("EVENTS_INGRESS_STREAM_DEFAULT", "example-ingress-stream").toString();
public static String EGRESS_KINESIS_STREAM_NAME = properties.getOrDefault("EVENTS_EGRESS_STREAM_DEFAULT", "example-egress-stream").toString();
public static boolean IS_LOCAL_DEV = properties.getOrDefault("IS_LOCAL_DEV", "false").equals("true");
+
+
public static boolean USE_ENHANCED_FANOUT = properties.getOrDefault("USE_ENHANCED_FANOUT", "true").equals("true");
public static String ENHANCED_FANOUT_NAME = properties.getOrDefault("ENHANCED_FANOUT_NAME", "example-enhanced-fanout").toString();
public static String APP_VERSION = properties.getOrDefault("app.version", "0.1").toString();
+
public static final AwsRegion getAwsRegion() {
+ return getAwsRegion(properties);
+ }
- if (properties.containsKey("AWS_REGION") && !properties.containsKey("AWS_ENDPOINT")){
- return AwsRegion.ofId(properties.get("AWS_REGION").toString());
+ private static AwsRegion getAwsRegion(Properties _properties) {
+
+ if (_properties.containsKey("AWS_REGION") && !_properties.containsKey("AWS_ENDPOINT")){
+ return AwsRegion.ofId(_properties.get("AWS_REGION").toString());
} else {
AwsRegion region = AwsRegion.ofCustomEndpoint(
- properties.getOrDefault("AWS_ENDPOINT", "https://localhost:4566").toString(),
- properties.getOrDefault("AWS_REGION", "us-east-1").toString()
+ _properties.getOrDefault("AWS_ENDPOINT", "https://localhost:4566").toString(),
+ _properties.getOrDefault("AWS_REGION", "us-east-1").toString()
);
try {
@@ -45,10 +70,10 @@ public static final AwsRegion getAwsRegion() {
{
field.set(region, "http://host.docker.internal:4566");
}
- } catch (Exception ex) {
+ }
+ catch (Exception ex) {
}
return region;
-
}
}
@@ -76,7 +101,58 @@ private static Properties getProperties() {
catch (Exception x) {
LOG.warn(x.getMessage(), x);
}
- return properties;
+
+ return resolveSecretValueReferences(properties);
}
+ // Replace property values of the form "!secret:" with the actual secret value from AWS Secrets Manager
+ private static Properties resolveSecretValueReferences(Properties properties) {
+
+ // Find property key/value pairs where the value has a secret reference
+ Map propertiesWithSecretsResolved = new HashMap<>();
+ properties.stringPropertyNames().forEach(propertyName -> {
+ String propertyValue = properties.getProperty(propertyName);
+ if (propertyValue.startsWith("!secret:")) {
+ String secretName = propertyValue.substring(8);
+ propertiesWithSecretsResolved.put(propertyName, secretName);
+ }
+ });
+
+ // Nothing to do if there are no secrets to resolve
+ if (propertiesWithSecretsResolved.isEmpty()) {
+ return properties;
+ }
+
+ String awsRegionId = properties.get("AWS_REGION").toString();
+ LOG.info("Resolving configuration secrets using AWS_REGION={}", awsRegionId);
+
+ // Create a Secrets Manager client
+ try (SecretsManagerClient client = SecretsManagerClient.builder()
+ .region(Region.of(awsRegionId))
+ .build()) {
+
+ // For each property with a secret reference, get the secret value from AWS Secrets Manager and replace
+ // the value in the properties map
+ propertiesWithSecretsResolved.entrySet().forEach(entry -> {
+ String propertyName = entry.getKey();
+ String secretName = entry.getKey();
+
+ GetSecretValueRequest getSecretValueRequest = GetSecretValueRequest.builder()
+ .secretId(secretName)
+ .build();
+
+ try {
+ properties.put(propertyName, client.getSecretValue(getSecretValueRequest).secretString());
+ }
+ catch (Exception e) {
+ LOG.warn("Failed to get secret value for {}", secretName, e);
+ }
+ });
+ }
+ catch (Exception e) {
+ LOG.warn("Failed init SecretsManagerClient: ", e);
+ }
+
+ return properties;
+ }
}
diff --git a/src/main/java/com/example/stateful_functions/Main.java b/src/main/java/com/example/stateful_functions/Main.java
index da6c1ab..820aadf 100644
--- a/src/main/java/com/example/stateful_functions/Main.java
+++ b/src/main/java/com/example/stateful_functions/Main.java
@@ -13,6 +13,11 @@ public static void main(String... args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StatefulFunctionsConfig stateFunConfig = StatefulFunctionsConfig.fromEnvironment(env);
+
+ stateFunConfig.setMetricFunctionNamespaceKey("ns");
+ stateFunConfig.setMetricFunctionTypeKey("type");
+ stateFunConfig.setMetricGroupAdapterFactoryClassName("com.example.stateful_functions.metrics.JetstreamMetricGroupAdapterFactory");
+
stateFunConfig.setProvider((StatefulFunctionsUniverseProvider) (classLoader, statefulFunctionsConfig) -> {
Modules modules = Modules.loadFromClassPath(stateFunConfig);
return modules.createStatefulFunctionsUniverse();
diff --git a/src/main/java/com/example/stateful_functions/function/product/ProductStatefulFunction.java b/src/main/java/com/example/stateful_functions/function/product/ProductStatefulFunction.java
index 7c0a947..a050475 100644
--- a/src/main/java/com/example/stateful_functions/function/product/ProductStatefulFunction.java
+++ b/src/main/java/com/example/stateful_functions/function/product/ProductStatefulFunction.java
@@ -101,11 +101,13 @@ private void handleFunctionSubscriptionEvent(Context context, CloudEvent subscri
FunctionSubscriptionDetails subscriptionDetails = cloudEventDataAccess.toFunctionSubscriptionDetails(subscriptionEvent);
FunctionSubscriber subscriber = FunctionSubscriberUtil.subscriberFromSubscription(subscriptionDetails);
if (subscriptionDetails.getAction() == FunctionSubscriptionAction.UNSUBSCRIBE) {
+ context.metrics().counter("unsubscribed").inc();
subscribers.remove(subscriber.getSubscriberId());
return;
}
if (subscriptionDetails.getAction() == FunctionSubscriptionAction.SUBSCRIBE) {
+ context.metrics().counter("subscribed").inc();
subscribers.set(subscriber.getSubscriberId(), subscriber);
}
@@ -113,6 +115,7 @@ private void handleFunctionSubscriptionEvent(Context context, CloudEvent subscri
}
private void notifySubscriber(Context context, FunctionSubscriber subscriber, CloudEvent productEvent) {
+ context.metrics().counter("notify_subscriber").inc();
send(context, new FunctionType(subscriber.getNamespace(), subscriber.getType()), subscriber.getId(), productEvent);
}
diff --git a/src/main/java/com/example/stateful_functions/metrics/JetstreamMetricCounter.java b/src/main/java/com/example/stateful_functions/metrics/JetstreamMetricCounter.java
new file mode 100644
index 0000000..16c61c4
--- /dev/null
+++ b/src/main/java/com/example/stateful_functions/metrics/JetstreamMetricCounter.java
@@ -0,0 +1,49 @@
+package com.example.stateful_functions.metrics;
+
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.LongGauge;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.SimpleCounter;
+
+public class JetstreamMetricCounter implements Counter {
+
+ private final Counter flinkCounter = new SimpleCounter();
+
+ private final LongGauge otelGauge;
+ private final Attributes otelAttributes;
+
+ public JetstreamMetricCounter(LongGauge otelGauge, Attributes otelAttributes) {
+ this.otelGauge = otelGauge;
+ this.otelAttributes = otelAttributes;
+ }
+
+ @Override
+ public void inc() {
+ flinkCounter.inc();
+ otelGauge.set(flinkCounter.getCount(), otelAttributes);
+ }
+
+ @Override
+ public void inc(long l) {
+ otelGauge.set(flinkCounter.getCount() + l, otelAttributes);
+ flinkCounter.inc(l);
+
+ }
+
+ @Override
+ public void dec() {
+ flinkCounter.dec();
+ otelGauge.set(flinkCounter.getCount(), otelAttributes);
+ }
+
+ @Override
+ public void dec(long l) {
+ otelGauge.set(flinkCounter.getCount() - l, otelAttributes);
+ flinkCounter.dec(l);
+ }
+
+ @Override
+ public long getCount() {
+ return flinkCounter.getCount();
+ }
+}
diff --git a/src/main/java/com/example/stateful_functions/metrics/JetstreamMetricGroupAdapter.java b/src/main/java/com/example/stateful_functions/metrics/JetstreamMetricGroupAdapter.java
new file mode 100644
index 0000000..5759d4a
--- /dev/null
+++ b/src/main/java/com/example/stateful_functions/metrics/JetstreamMetricGroupAdapter.java
@@ -0,0 +1,123 @@
+package com.example.stateful_functions.metrics;
+
+import com.example.stateful_functions.Configuration;
+
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.common.AttributesBuilder;
+import io.opentelemetry.api.metrics.LongGauge;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.metrics.*;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Adapter for Flink's MetricGroup which creates wrappers around Counters that also report via OpenTelemetry.
+ */
+public class JetstreamMetricGroupAdapter implements MetricGroup {
+
+ private final MetricGroup adapted;
+ private final OpenTelemetrySdk openTelemetrySdk;
+
+ private static final char METRIC_SCOPE_DELIMITER = '.';
+
+ // Exclude this list of attributes from the OpenTelemetry metrics
+ private static final Set EXCLUDE_ATTRIBUTES = Set.of(
+ "host", // ="10.89.0.23"
+ "job_id", // ="279f294c8e2048d912878c11fd1f1348"
+ "operator_id", // ="6b87a4870d0e21cecbbe271bd893cfcc"
+ // "operator_name", // ="functions"
+ "subtask_index", // ="0"
+ "task_attempt_id", // ="7dfc52028fab716222444f4f29d14905"
+ "task_attempt_num", // ="0"
+ "task_id", // ="31284d56d1e2112b0f20099ee448a6a9"
+ "task_name", // ="feedback-union -> functions -> Sink: rad-output-egress-egress"
+ "tm_id" // ="10.89.0.23:35601-565f93"
+ );
+
+ public JetstreamMetricGroupAdapter(MetricGroup adapted, OpenTelemetrySdk openTelemetrySdk) {
+ this.adapted = adapted;
+ this.openTelemetrySdk = openTelemetrySdk;
+ }
+
+ @Override
+ public Counter counter(String name) {
+
+ final String flinkScope;
+ final Map variables;
+ if (adapted instanceof AbstractMetricGroup) {
+ AbstractMetricGroup> abstractMetricGroup = (AbstractMetricGroup>) adapted;
+ flinkScope = abstractMetricGroup.getLogicalScope(CharacterFilter.NO_OP_FILTER, METRIC_SCOPE_DELIMITER);
+ variables = adapted.getAllVariables().entrySet().stream()
+ .map(entry -> Map.entry(StringUtils.removeEnd(StringUtils.removeStart(entry.getKey(), "<"),">"), entry.getValue()))
+ .filter(entry -> !EXCLUDE_ATTRIBUTES.contains(entry.getKey()))
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ }
+ else {
+ flinkScope = "unresolvable.scope";
+ variables = Map.of("adaptedClass", adapted.getClass().getName());
+ }
+ LongGauge otelGauge = openTelemetrySdk.getMeter(OpenTelemetrySdkUtil.getServiceName())
+ .gaugeBuilder(flinkScope + METRIC_SCOPE_DELIMITER + name)
+ .ofLongs()
+ .build();
+ AttributesBuilder attributesBuilder = Attributes.builder();
+ variables.forEach(attributesBuilder::put);
+ attributesBuilder.put("service.namespace", Configuration.NAMESPACE);
+
+ return counter(name, new JetstreamMetricCounter(otelGauge, attributesBuilder.build()));
+ }
+
+ @Override
+ public C counter(String name, C counter) {
+ return adapted.counter(name, counter);
+ }
+
+ @Override
+ public > G gauge(String name, G g) {
+ return adapted.gauge(name, g);
+ }
+
+ @Override
+ public H histogram(String name, H h) {
+ return adapted.histogram(name, h);
+ }
+
+ @Override
+ public M meter(String name, M m) {
+ return adapted.meter(name, m);
+ }
+
+ @Override
+ public MetricGroup addGroup(String name) {
+ return adapted.addGroup(name);
+ }
+
+ @Override
+ public MetricGroup addGroup(String key, String value) {
+ return adapted.addGroup(key, value);
+ }
+
+ @Override
+ public String[] getScopeComponents() {
+ return adapted.getScopeComponents();
+ }
+
+ @Override
+ public Map getAllVariables() {
+ return adapted.getAllVariables();
+ }
+
+ @Override
+ public String getMetricIdentifier(String metricName) {
+ return adapted.getMetricIdentifier(metricName);
+ }
+
+ @Override
+ public String getMetricIdentifier(String metricName, CharacterFilter characterFilter) {
+ return adapted.getMetricIdentifier(metricName, characterFilter);
+ }
+}
diff --git a/src/main/java/com/example/stateful_functions/metrics/JetstreamMetricGroupAdapterFactory.java b/src/main/java/com/example/stateful_functions/metrics/JetstreamMetricGroupAdapterFactory.java
new file mode 100644
index 0000000..fff819d
--- /dev/null
+++ b/src/main/java/com/example/stateful_functions/metrics/JetstreamMetricGroupAdapterFactory.java
@@ -0,0 +1,34 @@
+package com.example.stateful_functions.metrics;
+
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.statefun.flink.core.metrics.MetricGroupAdapterFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JetstreamMetricGroupAdapterFactory implements MetricGroupAdapterFactory {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JetstreamMetricGroupAdapterFactory.class);
+
+ @Override
+ public MetricGroup createMetricGroupAdapter(MetricGroup metricGroup) {
+ try {
+ ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+ if (classLoader == null) {
+ classLoader = JetstreamMetricGroupAdapterFactory.class.getClassLoader();
+ }
+ Class> sdkUtilClass = classLoader.loadClass("com.example.stateful_functions.metrics.OpenTelemetrySdkUtil");
+ Object sdk = sdkUtilClass.getMethod("getSdk").invoke(null);
+ if (sdk instanceof OpenTelemetrySdk) {
+ LOG.warn("Creating JetstreamMetricGroupAdapter with OpenTelemetry SDK: {}@{}", sdk.getClass().getName(), sdk.hashCode());
+ return new JetstreamMetricGroupAdapter(metricGroup, (OpenTelemetrySdk) sdk);
+ }
+ } catch (Exception x) {
+ LOG.error(x.getMessage(), x);
+ LOG.warn("Returning original MetricGroup due to errors resolving SDK");
+ }
+
+ LOG.warn("OpenTelemetry SDK is disabled. Returning original MetricGroup.");
+ return metricGroup;
+ }
+}
diff --git a/src/main/java/com/example/stateful_functions/metrics/OpenTelemetrySdkUtil.java b/src/main/java/com/example/stateful_functions/metrics/OpenTelemetrySdkUtil.java
new file mode 100644
index 0000000..e6081b4
--- /dev/null
+++ b/src/main/java/com/example/stateful_functions/metrics/OpenTelemetrySdkUtil.java
@@ -0,0 +1,173 @@
+package com.example.stateful_functions.metrics;
+
+import com.example.stateful_functions.Configuration;
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
+import io.opentelemetry.context.propagation.ContextPropagators;
+import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporter;
+import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporterBuilder;
+import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.metrics.SdkMeterProvider;
+import io.opentelemetry.sdk.metrics.export.AggregationTemporalitySelector;
+import io.opentelemetry.sdk.metrics.export.MetricExporter;
+import io.opentelemetry.sdk.metrics.export.MetricReader;
+import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
+import io.opentelemetry.sdk.resources.Resource;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.common.protocol.types.Field;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.*;
+
+public class OpenTelemetrySdkUtil {
+
+ private static final Logger LOG = LoggerFactory.getLogger(OpenTelemetrySdkUtil.class);
+
+ private static boolean initialized = false;
+ private static OpenTelemetrySdk sdk = null;
+
+ private static OpenTelemetrySdk initialized(OpenTelemetrySdk _sdk) {
+ initialized = true;
+ sdk = _sdk;
+ return sdk;
+ }
+
+ public static String getServiceName() {
+ return Configuration.OTEL_SERVICE_NAME + '-' + Configuration.ENVIRONMENT;
+ }
+
+ public static synchronized OpenTelemetrySdk getSdk() {
+ if (initialized) {
+ return sdk;
+ }
+
+ // In the logging output, obfuscate headers values where 'key' is in the header name
+ String headersToLog;
+ if (StringUtils.isNotEmpty(Configuration.OTEL_EXPORTER_OTLP_HEADERS)) {
+ List items = new ArrayList<>();
+ String[] headerArray = Configuration.OTEL_EXPORTER_OTLP_HEADERS.split(",");
+ for (String header : headerArray) {
+ String[] keyValue = header.split("=");
+ if (keyValue.length == 2) {
+ if (keyValue[0].contains("key")) {
+ items.add(String.format("%s=%s", keyValue[0],
+ new String(Base64.getEncoder().encode(keyValue[1].getBytes(StandardCharsets.UTF_8)), StandardCharsets.UTF_8)));
+ }
+ else {
+ items.add(header);
+ }
+ }
+ else {
+ items.add(header);
+ }
+ }
+ headersToLog = StringUtils.joinWith(",", items);
+ }
+ else {
+ headersToLog = Configuration.OTEL_EXPORTER_OTLP_HEADERS;
+ }
+
+ LOG.warn("Initializing with OTEL_SDK_DISABLED={}, OTEL_EXPORTER_OTLP_PROTOCOL='{}', OTEL_EXPORTER_OTLP_ENDPOINT='{}', OTEL_EXPORTER_OTLP_HEADERS='{}'",
+ Configuration.OTEL_SDK_DISABLED,
+ Configuration.OTEL_EXPORTER_OTLP_PROTOCOL,
+ Configuration.OTEL_EXPORTER_OTLP_ENDPOINT,
+ headersToLog);
+
+ if (Configuration.OTEL_SDK_DISABLED) {
+ LOG.warn("OpenTelemetry SDK is disabled. Skipping initialization.");
+ return initialized(null);
+ }
+
+ if (StringUtils.isEmpty(Configuration.OTEL_EXPORTER_OTLP_PROTOCOL)) {
+ LOG.warn("OTLP protocol is not set. Skipping initialization.");
+ return initialized(null);
+ }
+
+ if (StringUtils.isEmpty(Configuration.OTEL_EXPORTER_OTLP_ENDPOINT)) {
+ LOG.warn("OTLP endpoint is not set. Skipping initialization.");
+ return initialized(null);
+ }
+
+ Resource serviceNameResourceResource = Resource.create(Attributes.of(AttributeKey.stringKey("service.name"), getServiceName()));
+
+ Map headers = new HashMap<>();
+
+ if (StringUtils.isNotEmpty(Configuration.OTEL_EXPORTER_OTLP_HEADERS)) {
+ String[] headerArray = Configuration.OTEL_EXPORTER_OTLP_HEADERS.split(",");
+ for (String header : headerArray) {
+ String[] keyValue = header.split("=");
+ if (keyValue.length == 2) {
+ headers.put(keyValue[0], keyValue[1]);
+ }
+ else {
+ LOG.warn("Header value '{}' parsed to {} token(s). Expected exactly 2", header, keyValue.length);
+ }
+ }
+ }
+
+ final AggregationTemporalitySelector aggregationTemporalitySelector;
+ switch (Configuration.OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE.toUpperCase()) {
+ case "DELTA":
+ aggregationTemporalitySelector = AggregationTemporalitySelector.deltaPreferred();
+ break;
+ case "CUMULATIVE":
+ aggregationTemporalitySelector = AggregationTemporalitySelector.alwaysCumulative();
+ break;
+ case "LOWMEMORY":
+ aggregationTemporalitySelector = AggregationTemporalitySelector.lowMemory();
+ break;
+ default:
+ LOG.warn("Unsupported OTLP metrics temporality preference: '{}'. Defaulting to DELTA.", Configuration.OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE);
+ aggregationTemporalitySelector = AggregationTemporalitySelector.deltaPreferred();
+ break;
+ }
+
+ final MetricExporter metricExporter;
+ final String metricEndpoint = Configuration.OTEL_EXPORTER_OTLP_ENDPOINT + "/v1/metrics";
+ switch (Configuration.OTEL_EXPORTER_OTLP_PROTOCOL) {
+ case "http/protobuf":
+ LOG.info("Using OTLP HTTP/Protobuf");
+ OtlpHttpMetricExporterBuilder builder = OtlpHttpMetricExporter.builder()
+ .setHeaders(() -> headers)
+ .setEndpoint(metricEndpoint)
+ .setTimeout(Duration.ofMillis(Configuration.OTEL_METRIC_EXPORT_TIMEOUT))
+ .setAggregationTemporalitySelector(aggregationTemporalitySelector);
+ metricExporter = builder.build();
+ break;
+ case "grpc":
+ LOG.info("Using OTLP gRPC");
+ metricExporter = OtlpGrpcMetricExporter.builder()
+ .setHeaders(() -> headers)
+ .setEndpoint(metricEndpoint)
+ .setTimeout(Duration.ofMillis(Configuration.OTEL_METRIC_EXPORT_TIMEOUT))
+ .setAggregationTemporalitySelector(aggregationTemporalitySelector)
+ .build();
+ break;
+ default:
+ LOG.warn("Unsupported OTLP protocol: '{}'.", Configuration.OTEL_EXPORTER_OTLP_PROTOCOL);
+ return initialized(null);
+ }
+ MetricReader periodicReader =
+ PeriodicMetricReader.builder(metricExporter)
+ .setInterval(Duration.ofMillis(Configuration.OTEL_METRIC_EXPORT_INTERVAL))
+ .build();
+
+ SdkMeterProvider sdkMeterProvider = SdkMeterProvider.builder()
+ .addResource(serviceNameResourceResource)
+ .registerMetricReader(periodicReader)
+ .build();
+
+ Runtime.getRuntime().addShutdownHook(new Thread(sdkMeterProvider::close));
+ sdk =
+ OpenTelemetrySdk.builder()
+ .setMeterProvider(sdkMeterProvider)
+ .setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance()))
+ .build();
+ return initialized(sdk);
+ }
+}