diff --git a/api/src/main/java/marquez/MarquezContext.java b/api/src/main/java/marquez/MarquezContext.java index 910a1b6a73..a5a89ac44f 100644 --- a/api/src/main/java/marquez/MarquezContext.java +++ b/api/src/main/java/marquez/MarquezContext.java @@ -54,11 +54,11 @@ import marquez.service.OpenLineageService; import marquez.service.RunService; import marquez.service.RunTransitionListener; -import marquez.service.SearchService; import marquez.service.ServiceFactory; import marquez.service.SourceService; import marquez.service.TagService; import marquez.service.models.Tag; +import marquez.service.v2beta; import org.jdbi.v3.core.Jdbi; @Getter @@ -91,7 +91,7 @@ public final class MarquezContext { @Getter private final OpenLineageService openLineageService; @Getter private final LineageService lineageService; @Getter private final ColumnLineageService columnLineageService; - @Getter private final SearchService searchService; + @Getter private final v2beta.SearchService searchService; @Getter private final NamespaceResource namespaceResource; @Getter private final SourceResource sourceResource; @Getter private final DatasetResource datasetResource; @@ -148,7 +148,7 @@ private MarquezContext( this.openLineageService = new OpenLineageService(baseDao, runService); this.lineageService = new LineageService(lineageDao, jobDao); this.columnLineageService = new ColumnLineageService(columnLineageDao, datasetFieldDao); - this.searchService = new SearchService(searchConfig); + this.searchService = new v2beta.SearchService(searchConfig); this.jdbiException = new JdbiExceptionExceptionMapper(); this.jsonException = new JsonProcessingExceptionMapper(); final ServiceFactory serviceFactory = diff --git a/api/src/main/java/marquez/api/v2beta/SearchResource.java b/api/src/main/java/marquez/api/v2beta/SearchResource.java index 527a50f7e5..71800af599 100644 --- a/api/src/main/java/marquez/api/v2beta/SearchResource.java +++ b/api/src/main/java/marquez/api/v2beta/SearchResource.java @@ -26,8 +26,8 @@ import lombok.NonNull; import lombok.ToString; import lombok.extern.slf4j.Slf4j; -import marquez.service.SearchService; import marquez.service.ServiceFactory; +import marquez.service.v2beta; import org.opensearch.client.opensearch.core.SearchResponse; import org.opensearch.client.opensearch.core.search.Hit; @@ -35,7 +35,7 @@ @Path("/api/v2beta/search") public class SearchResource { - private final SearchService searchService; + private final v2beta.SearchService searchService; public SearchResource(@NonNull final ServiceFactory serviceFactory) { this.searchService = serviceFactory.getSearchService(); diff --git a/api/src/main/java/marquez/service/SearchService.java b/api/src/main/java/marquez/service/SearchService.java deleted file mode 100644 index ab7173c570..0000000000 --- a/api/src/main/java/marquez/service/SearchService.java +++ /dev/null @@ -1,254 +0,0 @@ -/* - * Copyright 2018-2024 contributors to the Marquez project - * SPDX-License-Identifier: Apache-2.0 - */ - -package marquez.service; - -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import javax.validation.Valid; -import javax.validation.constraints.NotNull; -import lombok.extern.slf4j.Slf4j; -import marquez.search.SearchConfig; -import marquez.service.models.LineageEvent; -import org.apache.http.HttpHost; -import org.apache.http.auth.AuthScope; -import org.apache.http.auth.UsernamePasswordCredentials; -import org.apache.http.impl.client.BasicCredentialsProvider; -import org.opensearch.client.RestClient; -import org.opensearch.client.json.jackson.JacksonJsonpMapper; -import org.opensearch.client.opensearch.OpenSearchClient; -import org.opensearch.client.opensearch._types.query_dsl.Operator; -import org.opensearch.client.opensearch._types.query_dsl.TextQueryType; -import org.opensearch.client.opensearch.core.IndexRequest; -import org.opensearch.client.opensearch.core.SearchResponse; -import org.opensearch.client.opensearch.core.search.BuiltinHighlighterType; -import org.opensearch.client.opensearch.core.search.HighlighterType; -import org.opensearch.client.transport.OpenSearchTransport; -import org.opensearch.client.transport.endpoints.BooleanResponse; -import org.opensearch.client.transport.rest_client.RestClientTransport; - -@Slf4j -public class SearchService { - - String[] DATASET_FIELDS = { - "run_id", - "name", - "namespace", - "facets.schema.fields.name", - "facets.schema.fields.type", - "facets.columnLineage.fields.*.inputFields.name", - "facets.columnLineage.fields.*.inputFields.namespace", - "facets.columnLineage.fields.*.inputFields.field", - "facets.columnLineage.fields.*.transformationDescription", - "facets.columnLineage.fields.*.transformationType" - }; - - String[] JOB_FIELDS = { - "facets.sql.query", - "facets.sourceCode.sourceCode", - "facets.sourceCode.language", - "runFacets.processing_engine.name", - "run_id", - "name", - "namespace", - "type" - }; - - private final OpenSearchClient openSearchClient; - private final SearchConfig searchConfig; - - public SearchService(SearchConfig searchConfig) { - this.searchConfig = searchConfig; - if (!searchConfig.isEnabled()) { - log.info("Search is disabled, skipping initialization"); - this.openSearchClient = null; - return; - } - final HttpHost host = - new HttpHost(searchConfig.getHost(), searchConfig.getPort(), searchConfig.getScheme()); - final BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider(); - credentialsProvider.setCredentials( - new AuthScope(host), - new UsernamePasswordCredentials(searchConfig.getUsername(), searchConfig.getPassword())); - final RestClient restClient = - RestClient.builder(host) - .setHttpClientConfigCallback( - httpClientBuilder -> - httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)) - .build(); - - JacksonJsonpMapper jsonpMapper = new JacksonJsonpMapper(); - // register JavaTimeModule to handle ZonedDateTime - jsonpMapper.objectMapper().registerModule(new JavaTimeModule()); - final OpenSearchTransport transport = new RestClientTransport(restClient, jsonpMapper); - this.openSearchClient = new OpenSearchClient(transport); - BooleanResponse booleanResponse; - try { - booleanResponse = openSearchClient.ping(); - log.info("OpenSearch Active: {}", booleanResponse.value()); - } catch (IOException e) { - log.warn("Search not configured"); - } - } - - public OpenSearchClient getClient() { - return this.openSearchClient; - } - - public SearchResponse searchDatasets(String query) throws IOException { - return this.openSearchClient.search( - s -> - s.index("datasets") - .query( - q -> - q.multiMatch( - m -> - m.query(query) - .type(TextQueryType.PhrasePrefix) - .fields(Arrays.stream(DATASET_FIELDS).toList()) - .operator(Operator.Or))) - .highlight( - hl -> { - for (String field : DATASET_FIELDS) { - hl.fields( - field, - f -> - f.type( - HighlighterType.of( - fn -> fn.builtin(BuiltinHighlighterType.Plain)))); - } - return hl; - }), - ObjectNode.class); - } - - public SearchResponse searchJobs(String query) throws IOException { - return this.openSearchClient.search( - s -> { - s.index("jobs") - .query( - q -> - q.multiMatch( - m -> - m.query(query) - .type(TextQueryType.PhrasePrefix) - .fields(Arrays.stream(JOB_FIELDS).toList()) - .operator(Operator.Or))); - s.highlight( - hl -> { - for (String field : JOB_FIELDS) { - hl.fields( - field, - f -> - f.type( - HighlighterType.of(fn -> fn.builtin(BuiltinHighlighterType.Plain)))); - } - return hl; - }); - return s; - }, - ObjectNode.class); - } - - public void indexEvent(@Valid @NotNull LineageEvent event) { - if (!searchConfig.isEnabled()) { - log.debug("Search is disabled, skipping indexing"); - return; - } - UUID runUuid = runUuidFromEvent(event.getRun()); - log.debug("Indexing event {}", event); - - if (event.getInputs() != null) { - indexDatasets(event.getInputs(), runUuid, event); - } - if (event.getOutputs() != null) { - indexDatasets(event.getOutputs(), runUuid, event); - } - indexJob(runUuid, event); - } - - private UUID runUuidFromEvent(LineageEvent.Run run) { - UUID runUuid; - try { - runUuid = UUID.fromString(run.getRunId()); - } catch (Exception e) { - runUuid = UUID.nameUUIDFromBytes(run.getRunId().getBytes(StandardCharsets.UTF_8)); - } - return runUuid; - } - - private Map buildJobIndexRequest(UUID runUuid, LineageEvent event) { - Map jsonMap = new HashMap<>(); - - jsonMap.put("run_id", runUuid.toString()); - jsonMap.put("eventType", event.getEventType()); - jsonMap.put("name", event.getJob().getName()); - jsonMap.put("type", event.getJob().isStreamingJob() ? "STREAM" : "BATCH"); - jsonMap.put("namespace", event.getJob().getNamespace()); - jsonMap.put("facets", event.getJob().getFacets()); - jsonMap.put("runFacets", event.getRun().getFacets()); - return jsonMap; - } - - private Map buildDatasetIndexRequest( - UUID runUuid, LineageEvent.Dataset dataset, LineageEvent event) { - Map jsonMap = new HashMap<>(); - jsonMap.put("run_id", runUuid.toString()); - jsonMap.put("eventType", event.getEventType()); - jsonMap.put("name", dataset.getName()); - jsonMap.put("inputFacets", dataset.getInputFacets()); - jsonMap.put("outputFacets", dataset.getOutputFacets()); - jsonMap.put("namespace", dataset.getNamespace()); - jsonMap.put("facets", dataset.getFacets()); - return jsonMap; - } - - private void indexJob(UUID runUuid, LineageEvent event) { - index( - IndexRequest.of( - i -> - i.index("jobs") - .id( - String.format( - "JOB:%s:%s", event.getJob().getNamespace(), event.getJob().getName())) - .document(buildJobIndexRequest(runUuid, event)))); - } - - private void indexDatasets( - List datasets, UUID runUuid, LineageEvent event) { - datasets.stream() - .map(dataset -> buildDatasetIndexRequest(runUuid, dataset, event)) - .forEach( - jsonMap -> - index( - IndexRequest.of( - i -> - i.index("datasets") - .id( - String.format( - "DATASET:%s:%s", - jsonMap.get("namespace"), jsonMap.get("name"))) - .document(jsonMap)))); - } - - private void index(IndexRequest> request) { - try { - this.openSearchClient.index(request); - } catch (IOException e) { - log.error("Failed to index event OpenSearch not available.", e); - } - } - - public boolean isEnabled() { - return !searchConfig.isEnabled(); - } -} diff --git a/api/src/main/java/marquez/service/ServiceFactory.java b/api/src/main/java/marquez/service/ServiceFactory.java index 86d067bf0f..5b0b986c50 100644 --- a/api/src/main/java/marquez/service/ServiceFactory.java +++ b/api/src/main/java/marquez/service/ServiceFactory.java @@ -23,5 +23,5 @@ public class ServiceFactory { @NonNull DatasetFieldService datasetFieldService; @NonNull LineageService lineageService; @NonNull ColumnLineageService columnLineageService; - @NonNull SearchService searchService; + @NonNull v2beta.SearchService searchService; } diff --git a/api/src/main/java/marquez/service/v2beta.java b/api/src/main/java/marquez/service/v2beta.java new file mode 100644 index 0000000000..406a2914c3 --- /dev/null +++ b/api/src/main/java/marquez/service/v2beta.java @@ -0,0 +1,252 @@ +package marquez.service; + +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import javax.validation.Valid; +import javax.validation.constraints.NotNull; +import lombok.extern.slf4j.Slf4j; +import marquez.search.SearchConfig; +import marquez.service.models.LineageEvent; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.opensearch.client.RestClient; +import org.opensearch.client.json.jackson.JacksonJsonpMapper; +import org.opensearch.client.opensearch.OpenSearchClient; +import org.opensearch.client.opensearch._types.query_dsl.Operator; +import org.opensearch.client.opensearch._types.query_dsl.TextQueryType; +import org.opensearch.client.opensearch.core.IndexRequest; +import org.opensearch.client.opensearch.core.SearchResponse; +import org.opensearch.client.opensearch.core.search.BuiltinHighlighterType; +import org.opensearch.client.opensearch.core.search.HighlighterType; +import org.opensearch.client.transport.OpenSearchTransport; +import org.opensearch.client.transport.endpoints.BooleanResponse; +import org.opensearch.client.transport.rest_client.RestClientTransport; + +public class v2beta { + @Slf4j + public static class SearchService { + + String[] DATASET_FIELDS = { + "run_id", + "name", + "namespace", + "facets.schema.fields.name", + "facets.schema.fields.type", + "facets.columnLineage.fields.*.inputFields.name", + "facets.columnLineage.fields.*.inputFields.namespace", + "facets.columnLineage.fields.*.inputFields.field", + "facets.columnLineage.fields.*.transformationDescription", + "facets.columnLineage.fields.*.transformationType" + }; + + String[] JOB_FIELDS = { + "facets.sql.query", + "facets.sourceCode.sourceCode", + "facets.sourceCode.language", + "runFacets.processing_engine.name", + "run_id", + "name", + "namespace", + "type" + }; + + private final OpenSearchClient openSearchClient; + private final SearchConfig searchConfig; + + public SearchService(SearchConfig searchConfig) { + this.searchConfig = searchConfig; + if (!searchConfig.isEnabled()) { + log.info("Search is disabled, skipping initialization"); + this.openSearchClient = null; + return; + } + final HttpHost host = + new HttpHost(searchConfig.getHost(), searchConfig.getPort(), searchConfig.getScheme()); + final BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials( + new AuthScope(host), + new UsernamePasswordCredentials(searchConfig.getUsername(), searchConfig.getPassword())); + final RestClient restClient = + RestClient.builder(host) + .setHttpClientConfigCallback( + httpClientBuilder -> + httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)) + .build(); + + JacksonJsonpMapper jsonpMapper = new JacksonJsonpMapper(); + // register JavaTimeModule to handle ZonedDateTime + jsonpMapper.objectMapper().registerModule(new JavaTimeModule()); + final OpenSearchTransport transport = new RestClientTransport(restClient, jsonpMapper); + this.openSearchClient = new OpenSearchClient(transport); + BooleanResponse booleanResponse; + try { + booleanResponse = openSearchClient.ping(); + log.info("OpenSearch Active: {}", booleanResponse.value()); + } catch (IOException e) { + log.warn("Search not configured"); + } + } + + public OpenSearchClient getClient() { + return this.openSearchClient; + } + + public SearchResponse searchDatasets(String query) throws IOException { + return this.openSearchClient.search( + s -> + s.index("datasets") + .query( + q -> + q.multiMatch( + m -> + m.query(query) + .type(TextQueryType.PhrasePrefix) + .fields(Arrays.stream(DATASET_FIELDS).toList()) + .operator(Operator.Or))) + .highlight( + hl -> { + for (String field : DATASET_FIELDS) { + hl.fields( + field, + f -> + f.type( + HighlighterType.of( + fn -> fn.builtin(BuiltinHighlighterType.Plain)))); + } + return hl; + }), + ObjectNode.class); + } + + public SearchResponse searchJobs(String query) throws IOException { + return this.openSearchClient.search( + s -> { + s.index("jobs") + .query( + q -> + q.multiMatch( + m -> + m.query(query) + .type(TextQueryType.PhrasePrefix) + .fields(Arrays.stream(JOB_FIELDS).toList()) + .operator(Operator.Or))); + s.highlight( + hl -> { + for (String field : JOB_FIELDS) { + hl.fields( + field, + f -> + f.type( + HighlighterType.of( + fn -> fn.builtin(BuiltinHighlighterType.Plain)))); + } + return hl; + }); + return s; + }, + ObjectNode.class); + } + + public void indexEvent(@Valid @NotNull LineageEvent event) { + if (!searchConfig.isEnabled()) { + log.debug("Search is disabled, skipping indexing"); + return; + } + UUID runUuid = runUuidFromEvent(event.getRun()); + log.debug("Indexing event {}", event); + + if (event.getInputs() != null) { + indexDatasets(event.getInputs(), runUuid, event); + } + if (event.getOutputs() != null) { + indexDatasets(event.getOutputs(), runUuid, event); + } + indexJob(runUuid, event); + } + + private UUID runUuidFromEvent(LineageEvent.Run run) { + UUID runUuid; + try { + runUuid = UUID.fromString(run.getRunId()); + } catch (Exception e) { + runUuid = UUID.nameUUIDFromBytes(run.getRunId().getBytes(StandardCharsets.UTF_8)); + } + return runUuid; + } + + private Map buildJobIndexRequest(UUID runUuid, LineageEvent event) { + Map jsonMap = new HashMap<>(); + + jsonMap.put("run_id", runUuid.toString()); + jsonMap.put("eventType", event.getEventType()); + jsonMap.put("name", event.getJob().getName()); + jsonMap.put("type", event.getJob().isStreamingJob() ? "STREAM" : "BATCH"); + jsonMap.put("namespace", event.getJob().getNamespace()); + jsonMap.put("facets", event.getJob().getFacets()); + jsonMap.put("runFacets", event.getRun().getFacets()); + return jsonMap; + } + + private Map buildDatasetIndexRequest( + UUID runUuid, LineageEvent.Dataset dataset, LineageEvent event) { + Map jsonMap = new HashMap<>(); + jsonMap.put("run_id", runUuid.toString()); + jsonMap.put("eventType", event.getEventType()); + jsonMap.put("name", dataset.getName()); + jsonMap.put("inputFacets", dataset.getInputFacets()); + jsonMap.put("outputFacets", dataset.getOutputFacets()); + jsonMap.put("namespace", dataset.getNamespace()); + jsonMap.put("facets", dataset.getFacets()); + return jsonMap; + } + + private void indexJob(UUID runUuid, LineageEvent event) { + index( + IndexRequest.of( + i -> + i.index("jobs") + .id( + String.format( + "JOB:%s:%s", event.getJob().getNamespace(), event.getJob().getName())) + .document(buildJobIndexRequest(runUuid, event)))); + } + + private void indexDatasets( + List datasets, UUID runUuid, LineageEvent event) { + datasets.stream() + .map(dataset -> buildDatasetIndexRequest(runUuid, dataset, event)) + .forEach( + jsonMap -> + index( + IndexRequest.of( + i -> + i.index("datasets") + .id( + String.format( + "DATASET:%s:%s", + jsonMap.get("namespace"), jsonMap.get("name"))) + .document(jsonMap)))); + } + + private void index(IndexRequest> request) { + try { + this.openSearchClient.index(request); + } catch (IOException e) { + log.error("Failed to index event OpenSearch not available.", e); + } + } + + public boolean isEnabled() { + return !searchConfig.isEnabled(); + } + } +} diff --git a/api/src/test/java/marquez/api/ApiTestUtils.java b/api/src/test/java/marquez/api/ApiTestUtils.java index 3cf2c2c0b3..267c0a7248 100644 --- a/api/src/test/java/marquez/api/ApiTestUtils.java +++ b/api/src/test/java/marquez/api/ApiTestUtils.java @@ -17,10 +17,10 @@ import marquez.service.NamespaceService; import marquez.service.OpenLineageService; import marquez.service.RunService; -import marquez.service.SearchService; import marquez.service.ServiceFactory; import marquez.service.SourceService; import marquez.service.TagService; +import marquez.service.v2beta; public class ApiTestUtils { @@ -59,7 +59,8 @@ public static ServiceFactory mockServiceFactory(Map mocks) { .datasetService( (DatasetService) mocks.getOrDefault(DatasetService.class, (mock(DatasetService.class)))) .searchService( - (SearchService) mocks.getOrDefault(SearchService.class, (mock(SearchService.class)))) + (v2beta.SearchService) + mocks.getOrDefault(v2beta.SearchService.class, (mock(v2beta.SearchService.class)))) .build(); } }