#!/usr/bin/env bash
set -euo pipefail

# Prepare the Delta table on MinIO so that `deltalake` sink validation can pass.
# This mirrors `ci/scripts/e2e-deltalake-sink-rust-test.sh`, but is runnable locally.

script_dir="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
repo_root="$(cd "${script_dir}/../.." && pwd)"

env_file="${repo_root}/.risingwave/config/risedev-env"
if [ -z "${RISEDEV_DELTALAKE_LOCATION:-}" ] && [ -f "${env_file}" ]; then
  # Allow running this script standalone without manually sourcing risedev-env.
  set -a
  # shellcheck source=/dev/null
  source "${env_file}"
  set +a
fi

: "${RISEDEV_DELTALAKE_LOCATION:?missing}"
: "${RISEDEV_DELTALAKE_S3_ENDPOINT:?missing}"
: "${RISEDEV_DELTALAKE_S3_ACCESS_KEY:?missing}"
: "${RISEDEV_DELTALAKE_S3_SECRET_KEY:?missing}"

work_dir="${repo_root}/.risingwave/.deltalake"
mkdir -p "${work_dir}"

if [[ "${RISEDEV_DELTALAKE_LOCATION}" != s3a://*/* ]]; then
  echo "unsupported deltalake location: ${RISEDEV_DELTALAKE_LOCATION}" >&2
  exit 1
fi
deltalake_path="${RISEDEV_DELTALAKE_LOCATION#s3a://}"
deltalake_bucket="${deltalake_path%%/*}"
exactly_once_location="s3a://${deltalake_bucket}/deltalake-test-exactly-once"
exactly_once_path="${exactly_once_location#s3a://}"

echo "--- creating minio bucket"
(cd "${repo_root}" && ./risedev mc mb --ignore-existing "hummock-minio/${deltalake_bucket}")

echo "--- cleaning existing deltalake table at ${RISEDEV_DELTALAKE_LOCATION}"
(cd "${repo_root}" && ./risedev mc rm --recursive --force "hummock-minio/${deltalake_path}") || true
echo "--- cleaning existing deltalake table at ${exactly_once_location}"
(cd "${repo_root}" && ./risedev mc rm --recursive --force "hummock-minio/${exactly_once_path}") || true

spark_version="4.0.2"
spark_tgz="spark-${spark_version}-bin-hadoop3.tgz"
spark_dir="${work_dir}/spark-${spark_version}-bin-hadoop3"
spark_sql="${spark_dir}/bin/spark-sql"

if [[ -n "${JAVA_HOME:-}" ]]; then
  java_bin="${JAVA_HOME}/bin/java"
else
  java_bin="$(type -p java || true)"
fi

if [[ -x "${java_bin}" ]]; then
  java_ver=$("${java_bin}" -version 2>&1 | awk -F '"' '/version/ {print $2}' | cut -d'.' -f1)
  if [[ "${java_ver}" != "17" && "${java_ver}" != "21" ]]; then
    echo "Only Java 17/21 are supported for Spark ${spark_version}. Current version: ${java_ver}" >&2
    exit 1
  fi
else
  echo "Java not found. Please install Java 17 or 21." >&2
  exit 1
fi

if [ ! -x "${spark_sql}" ]; then
  echo "--- downloading spark (once)"
  cd "${work_dir}"
  if [ ! -f "${spark_tgz}" ]; then
    curl -fLsS -o "${spark_tgz}" "https://rw-ci-deps-dist.s3.amazonaws.com/${spark_tgz}"
  fi
  tar -xf "${spark_tgz}" --no-same-owner
fi

DEPENDENCIES="io.delta:delta-spark_2.13:4.0.1,org.apache.hadoop:hadoop-aws:3.4.1"
unset SPARK_HOME

echo "--- creating deltalake table at ${RISEDEV_DELTALAKE_LOCATION}"
"${spark_sql}" --packages "${DEPENDENCIES}" \
  --conf 'spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension' \
  --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog' \
  --conf 'spark.sql.session.timeZone=UTC' \
  --conf "spark.hadoop.fs.s3a.access.key=${RISEDEV_DELTALAKE_S3_ACCESS_KEY}" \
  --conf "spark.hadoop.fs.s3a.secret.key=${RISEDEV_DELTALAKE_S3_SECRET_KEY}" \
  --conf "spark.hadoop.fs.s3a.endpoint=${RISEDEV_DELTALAKE_S3_ENDPOINT}" \
  --conf 'spark.hadoop.fs.s3a.path.style.access=true' \
  --S --e "
    create table delta.\`${RISEDEV_DELTALAKE_LOCATION}\`(v1 int, v2 short, v3 long, v4 float, v5 double, v6 string, v7 date, v8 Timestamp, v9 boolean, v10 decimal, v11 ARRAY<decimal>) using delta;
    create table delta.\`${exactly_once_location}\`(v1 int, v2 short, v3 long, v4 float, v5 double, v6 string, v7 date, v8 Timestamp, v9 boolean, v10 decimal, v11 ARRAY<decimal>) using delta;
  "

echo "--- done"
