Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
98492b8
Running via AWS Managed Apache Flink works
kellinwood Apr 27, 2024
48a4b37
Updated CloudFormation templates. Includes custom resource for progr…
kellinwood May 8, 2024
cd74e6d
Remove unnecessary manual steps that are now handled in the custom re…
kellinwood May 8, 2024
d295854
Add app version to egressed events, run Flink 1.18 locally, and on Ap…
kellinwood Oct 29, 2024
fed64c5
Updated to show how to provision AWS Managed Flink via crossplane
kellinwood Dec 5, 2024
885da7d
Remove creds
kellinwood Dec 5, 2024
8d4f2c2
Fix broken formatting in README.md
kellinwood Dec 5, 2024
8f3cf75
Fix issues in README
kellinwood Dec 5, 2024
7b4bb91
Provision via Terraform and other minor changes
kellinwood Dec 12, 2024
b62cb74
README updates
kellinwood Dec 12, 2024
4f55f90
Enable snapshots
kellinwood Dec 12, 2024
2fa3724
Drop "2" from demo name, dont set startApplication by default
kellinwood Dec 12, 2024
7f2ba30
Don't include startApplication by default
kellinwood Dec 12, 2024
88fa325
Update README w/ latest info, +tweaks
kellinwood Dec 12, 2024
f60ffb3
More README fixes
kellinwood Dec 13, 2024
06bc543
README updates
kellinwood Dec 13, 2024
6fd45de
Cleanup and various fixes
kellinwood Jan 2, 2025
0e48580
Fix path to update_credentials.sh
kellinwood Jan 2, 2025
b02a693
More README fixes
kellinwood Jan 3, 2025
78d8e62
Remove unused providers
kellinwood Jan 3, 2025
5822863
Rename ./cloud back to ./local, composition function v0.1
kellinwood Jan 9, 2025
80fad54
Add EnvironmentConfig w/ AWS account ID placeholder
kellinwood Jan 15, 2025
76dd68b
More refactoring and cleanup
kellinwood Jan 16, 2025
fcfa04b
Use function-auto-ready for the ready status in the managed flink XR
kellinwood Jan 17, 2025
5995737
Enable debug output for the aws kinesisanalyticsv2 provider
kellinwood Jan 21, 2025
232cdd9
Remove setting of cloudWatchLoggingOptions and previous update loop w…
kellinwood Jan 21, 2025
86482dd
Minor fixes, try latest upbound/provider version
kellinwood Apr 9, 2025
dcc8762
added dependencies AWS wants in the jar
nathantippy-il Apr 22, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,10 @@
target
dependency-reduced-pom.xml
*~
.*~
.*~
.cwlogs
.next-shard-iterator
.DS_Store
venv*
.terraform*
terraform.tfstate*
14 changes: 7 additions & 7 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
# The parent Flink image (flink:1.13.2-scala_2.12-java11) only contains the JRE (openjdk:11-jre), and it is missing key
# The parent Flink image (flink:1.18.1-java11) only contains the JRE (openjdk:11-jre), and it is missing key
# diagnostic tools. This multistage build will overwrite the JRE with the JDK from openjdk:11
# See https://docs.docker.com/develop/develop-images/multistage-build/
FROM openjdk:11 as jdk_image
FROM flink:1.16.2-java11
FROM --platform=linux/amd64 openjdk:11 AS jdk_image
FROM --platform=linux/amd64 flink:1.18.1-java11

# Copy the JDK from the jdk_image
COPY --from=jdk_image /usr/local/openjdk-11 /usr/local/openjdk-11
COPY --from=jdk_image /usr/local/openjdk-11 /opt/java/openjdk/

RUN sed -i -e 's/^.*networkaddress.cache.ttl=.*$/networkaddress.cache.ttl=30/g' /usr/local/openjdk-11/conf/security/java.security
RUN sed -i -e 's/^.*networkaddress.cache.negative.ttl=.*$/networkaddress.cache.negative.ttl=10/g' /usr/local/openjdk-11/conf/security/java.security
RUN sed -i -e 's/^.*networkaddress.cache.ttl=.*$/networkaddress.cache.ttl=30/g' /opt/java/openjdk/conf/security/java.security
RUN sed -i -e 's/^.*networkaddress.cache.negative.ttl=.*$/networkaddress.cache.negative.ttl=10/g' /opt/java/openjdk/conf/security/java.security

# The 2019 AWS rds root cert
ADD rds-ca-2019-root.pem /etc/rds-ca-2019-root.pem
Expand Down Expand Up @@ -43,7 +43,7 @@ RUN mkdir -p $FLINK_JOB_DIR
COPY target/my-stateful-functions-embedded-java-3.3.0.jar ${FLINK_JOB_DIR}/flink-job.jar
RUN chown -R flink:flink ${FLINK_JOB_DIR}/

ENV PLUGIN_NAME flink-s3-fs-hadoop-1.16.2
ENV PLUGIN_NAME flink-s3-fs-hadoop-1.18.1
RUN mkdir -p "${FLINK_HOME}/plugins/${PLUGIN_NAME}"
RUN ln -fs "${FLINK_HOME}/opt/${PLUGIN_NAME}.jar" "${FLINK_HOME}/plugins/${PLUGIN_NAME}"

Expand Down
275 changes: 266 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ The purpose of this project is two-fold.
[Caliper](https://www.imsglobal.org/spec/caliper/v1p2) events.
* This project egresses results as events to a separate stream, whereas at Imagine Learning we mostly send our
results directly to OpenSearch and occasionally write events back to the ingress stream.
2. It will serve as the basis for an evaluation of Stateful Functions running on
2. It serves as the basis for an evaluation of Stateful Functions running on
[AWS Managed Flink](https://docs.aws.amazon.com/managed-flink/). At the time of
this writing Imagine Learning runs stateful functions on self-managed Kubernetes clusters, but we are looking to
this writing Imagine Learning runs Flink Stateful Functions on self-managed Kubernetes clusters, but we are looking to
see if AWS Managed Flink is a viable alternative.


Expand All @@ -23,7 +23,8 @@ This project demonstrates stateful functions under test in various ways:
* run-time execution in standalone job mode via docker compose


The project implements embedded functions (functions that execute in the Flink taskmanagers). Remote functions are future work.
The project implements embedded functions (functions that execute in the Flink taskmanagers). Remote functions are
future work.

This is an opinionated project. It uses...
* Spring Framework for dependency injection
Expand All @@ -38,7 +39,7 @@ This is an opinionated project. It uses...
Each forwarder is small piece of code that routes one or more specific event types
to a stateful function. To start routing a new event type, just implement another Forwarder.

## What this Stateful Functions appication does
## What this Stateful Functions application does
Example events and functions are provided which demonstrate notifying a shopping cart service of
product price and availability changes for items in users' carts. The project assumes the
existence of upstream microservices that send Product events (name,price,availability) and
Expand Down Expand Up @@ -91,22 +92,24 @@ Users running on Apple silicon should ensure that the file ~/.m2/settings.xml ex
</settings>
```

To compile the code and run the tests using the included Maven wrapper script...

To compile the code and run the tests using the included Maven wrapper script, first see below about
building and installing Apache Flink Stateful Functions compatible with Flink 1.18, then do this:
```
./mvnw test
```

## Running the project via Docker Compose
## Running the project locally via Docker Compose

Follow the instructions below to run the project via Docker Compose. Note that Kinesis support is provided
by a [localstack](https://www.localstack.cloud/) container.

The demo works using three docker compose "profiles" (phases).
1. In the first phase, the flink cluster running our stateful function application is started,
1. In the first phase, the flink cluster running our stateful function application is started,
along with localstack, and an aws-cli container that creates the ingress and egress Kinesis streams.
2. The second phase runs an aws-cli container to send events to the ingress stream. The events
2. The second phase runs an aws-cli container to send events to the ingress stream. The events
sent are from [product-cart-integration-test-events.jsonl](./src/test/resources/product-cart-integration-test-events.jsonl)
3. The third phase runs an aws-cli container to fetch the events from the egress stream and output them to the console.
3. The third phase runs an aws-cli container to fetch the events from the egress stream and output them to the console.
```shell
# Build this project and create the jar file
./mvnw package
Expand All @@ -131,3 +134,257 @@ docker compose --profile get-egress-events up
docker compose --profile all down
```

## Running the project via AWS Managed Flink

### Version compatibility between AWS Managed Flink and Stateful Functions

The latest release of Apache Flink Stateful Functions is 3.3, but its compiled and built
to run with Flink 1.16.2. AWS Managed Flink currently supports Flink versions 1.15 and 1.18. So the first
step towards running via AWS Managed Flink is to create a version of the stateful functions library
compatible with Flink 1.18. The required changes are provided here:
https://github.com/kellinwood/flink-statefun/pull/1/files.
Clone that repo, checkout the `release-3.3-1.18`
branch, and build/install it locally via `mvn install`

### Build and package this project
```shell
mvn package
```

The demo can be provisioned in AWS in three ways... via CloudFormation, Terraform, or Crossplane

### Provisioning via AWS CloudFormation

The templates and scripts used for provisioning the AWS resources via CloudFormation are in the [aws-cloudformation](./aws-cloudformation) directory.
```
cd aws-cloudformation
```

#### Create an S3 bucket and upload this project's JAR file

To create the bucket, create a CloudFormation stack named `flink-cf-demo-bucket` as defined [here](./aws-cloudformation/flink-cf-demo-bucket-stack.yaml),
and after that finishes, use the AWS CLI to upload the jar file:

```shell
export AWS_ACCOUNT_ID=516535517513 # Imagine Learning Sandbox account
aws s3 cp ../target/my-stateful-functions-embedded-java-3.3.0.jar s3://flink-cf-demo-bucket-${AWS_ACCOUNT_ID}/
```

#### Create the Kinesis streams, Managed Flink application, and related AWS Resources

Create a CloudFormation stack named `flink-cf-demo` as defined by the CloudFormation templates [here](./aws-cloudformation/flink-cf-demo-stack.yaml).
This stack includes a custom resource lambda that programmatically configures logging when the Flink application is created,
and transitions the application from the Ready to Running state.


#### Monitor the CloudWatch logging output

The following script will show all the log entries from the start of application launch, and will
wait for new entries to arrive and display them too. The script will resume from where it
left off if shut down via Ctrl-C. To start from scratch, remove the `.cwlogs` directory.
```shell
./demo-tail-logs.sh
```

### Send sample events to the ingress stream
```shell
./demo-send-events.sh
```

#### Get and display the events published to the egress stream
This script will show all events published to the egress stream since the start of application launch, and will
wait for new entries to arrive and display them too.
```shell
./demo-tail-egress.sh
```
#### Cleanup
Cleanup by manually deleting the jar file from the S3 bucket and the ingress Kinesis stream. Then delete the
Cloud Formation stacks. Cloud Formation will fail to delete a non-empty bucket, and fail to delete the ingress Kinesis
stream since Flink adds a fanout consumer to the stream which will block the deletion attempted by
Cloud Formation.

### Provisioning via Terraform

Requires installing the [Terraform CLI](https://developer.hashicorp.com/terraform/tutorials/aws-get-started/install-cli)

Steps

```shell
cd aws-terraform
# Configure your AWS profile, set AWS env vars, or run 'aws configure sso', etc
terraform init
terraform apply # When prompted, enter 'yes'
```
Immediately after entering 'yes' to the prompt issued by `terraform apply`, switch to another shell/terminal tab and
upload the application JAR file to the S3 bucket. The upload may fail if the S3 bucket has not been created by
Terraform yet, so keep trying until it succeeds.

```shell
export AWS_ACCOUNT_ID=516535517513 # Imagine Learning Sandbox account
aws s3 cp ../target/my-stateful-functions-embedded-java-3.3.0.jar \
s3://flink-tf-demo-bucket-${AWS_ACCOUNT_ID}/
```
Wait for the `terraform apply` command to complete.

#### Monitor the CloudWatch logging output

The following script will show all the log entries from the start of application launch, and will
wait for new entries to arrive and display them too. The script will resume from where it
left off if shut down via Ctrl-C. To start from scratch, remove the `.cwlogs` directory.
```shell
./demo-tail-logs.sh
```

#### Send sample events to the ingress stream
```shell
./demo-send-events.sh
```

#### Get and display the events published to the egress stream
This script will show all events published to the egress stream since the start of application launch, and will
wait for new entries to arrive and display them too.
```shell
./demo-tail-egress.sh
```

#### Cleanup
Cleanup by manually deleting the jar file from the S3 bucket, `flink-tf-demo-bucket-${AWS_ACCOUNT_ID}`, and the Kinesis
stream `flink-tf-demo-ingress`. Run the `terraform destroy` command. Note that the manual deletions are required
since Terraform can't delete a non-empty bucket, and can't delete the ingress stream since Flink adds a fanout consumer
to the stream which will block the deletion attempted by Terraform.

Alternatively, you can run the following commands to clean up the resources:
```shell
export AWS_ACCOUNT_ID=516535517513 # Imagine Learning Sandbox account
aws s3 rm --recursive s3://flink-tf-demo-bucket-${AWS_ACCOUNT_ID}/
aws kinesis delete-stream --enforce-consumer-deletion --stream-name flink-tf-demo-ingress
terraform destroy # When prompted, enter 'yes'
```

### Provisioning via Crossplane

#### Prerequisites:
- Docker
- idpbuilder (https://github.com/cnoe-io/idpbuilder)
- kubectl
- jq
- go lang

#### Introduction
This demodemonstrates that it is possible to provision and run an AWS Managed Flink application via Crossplane. Many
tasks normally performed via CI/CD must be completed manually as described below. The compositions for S3 buckets and
Kinesis streams currently use `function-patch-and-transform`, but the Managed Flink composition uses a custom function.


#### Instructions

The files to run the crossplane demo are in the [aws-crossplane](./aws-crossplane) directory.
```
cd aws-crossplane
```

##### Update the AWS credentials in the local environment

Login to AWS Identity Center, and copy the AWS credential environment variables commands from Access Keys page.

Paste and execute the AWS environment variable commands.

Set the AWS_ACCOUNT environment variable to your AWS account number, or run `aws configure sso` / `aws sso login`.
Setting the account number explicitly is optional if it can be determined instead via `aws sts get-caller-identity`.

Finally, run the following script to update the AWS credentials for the local environment:
```shell
./local/aws/update_credentials.sh
```

##### Launch and configure a Kubernetes cluster using the "idpbuilder" tool

Run `./launch-and-config-idp.sh`

This script will launch a local Kubernetes cluster using `kind`, and configure the cluster with the necessary
Crossplane providers and resources. It also builds and uploads the docker image for the Managed Flink composition
function.

The script takes a few minutes to complete.

##### Provision AWS Managed Flink via Crossplane claims

Provision the S3 bucket and Kinesis streams...
```
kubectl apply -f claims/demo-setup-claims.yaml
```

Wait for the resources to become synchronized and ready by checking the output of the following command:
```
kubectl get managed
```
The output of `kubectl get managed` will reveal the actual S3 bucket name under `EXTERNAL-NAME`.

Return to AWS Identity Center and launch the web console for the account.

Visit the S3 services page. Find the S3 bucket (flink-cp-demo-bucket-*) and upload the following file to the bucket
- `../target/my-stateful-functions-embedded-java-3.3.0.jar` (Flink demo application code)

Alternatively, use the AWS CLI to upload the file...
```
flink_bucket_name=$(kubectl get managed | grep bucket | awk '{print $4}')
aws s3 cp ../target/my-stateful-functions-embedded-java-3.3.0.jar s3://${flink_bucket_name}/
```

##### Provision the Managed Flink application

Apply the following claim to trigger the creation of the Flink application, its role, log group, and log stream.
Note at the time of this writing, the Flink application is not configured with the log stream to workaround a bug in
the Crossplane provider (https://github.com/crossplane-contrib/provider-upjet-aws/issues/1419).

```
kubectl apply -f claims/managed-flink-claim.yaml
```


Wait until the Flink application is in the 'Running' state. This may take a few minutes.

#### Monitor the CloudWatch logging output

See the note above re: logging config. Until the bug is fixed, no log output will be available.

The following script will show all the log entries from the start of application launch, and will
wait for new entries to arrive and display them too. The script will resume from where it
left off if shut down via Ctrl-C. To start from scratch, remove the `.cwlogs` directory.
```shell
./demo-tail-logs.sh
```

#### Send sample events to the ingress stream
```shell
./demo-send-events.sh
```

#### Get and display the events published to the egress stream
This script will show all events published to the egress stream since the start of application launch, and will
wait for new entries to arrive and display them too.
```shell
./demo-tail-egress.sh
```

#### Cleanup

Manually delete the files in the S3 bucket, and delete the Kinesis stream `flink-demo-ingress` (the Flink application
adds a fanout consumer to the stream which will block any deletion attempted by Crossplane).

```
aws s3 rm --recursive s3://$(aws s3 ls | grep flink-cp-demo | awk3)
aws kinesis delete-stream --enforce-consumer-deletion --stream-name flink-cp-demo-ingress
```

Run the following commands to delete the remaining resources:
```
kubectl delete -f claims/managed-flink-claim.yaml
kubectl delete -f claims/demo-setup-claims.yaml
```

Shut down the local IDP with the command:
```
idpbuilder delete
```

17 changes: 17 additions & 0 deletions aws-cloudformation/demo-send-events.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#! /bin/bash

if [ $(uname) = "Darwin" ]; then
MD5SUM=md5
else
MD5SUM=md5sum
fi

stream_name=$(aws kinesis list-streams | jq -crM .StreamNames[] | grep FlinkCfDemoIngressStream)

grep -v test.action ../src/test/resources/product-cart-integration-test-events.jsonl | while read line; do
partkey=$(echo $line | $MD5SUM | awk '{print $1}')
data=$(echo $line | base64)
cmd="aws kinesis put-record --stream-name $stream_name --partition-key $partkey --data $data"
echo $cmd
eval $cmd
done
Loading