#define USE_THE_REPOSITORY_VARIABLE

#include "../git-compat-util.h"
#include "../abspath.h"
#include "../chdir-notify.h"
#include "../config.h"
#include "../dir.h"
#include "../environment.h"
#include "../fsck.h"
#include "../gettext.h"
#include "../hash.h"
#include "../hex.h"
#include "../iterator.h"
#include "../ident.h"
#include "../object.h"
#include "../path.h"
#include "../refs.h"
#include "../reftable/reftable-basics.h"
#include "../reftable/reftable-error.h"
#include "../reftable/reftable-fsck.h"
#include "../reftable/reftable-iterator.h"
#include "../reftable/reftable-record.h"
#include "../reftable/reftable-stack.h"
#include "../repo-settings.h"
#include "../setup.h"
#include "../strmap.h"
#include "../trace2.h"
#include "../write-or-die.h"
#include "parse.h"
#include "refs-internal.h"

/*
 * Used as a flag in ref_update::flags when the ref_update was via an
 * update to HEAD.
 */
#define REF_UPDATE_VIA_HEAD (1 << 8)

struct reftable_backend {
	struct reftable_stack *stack;
	struct reftable_iterator it;
};

static void reftable_backend_on_reload(void *payload)
{
	struct reftable_backend *be = payload;
	reftable_iterator_destroy(&be->it);
}

static int reftable_backend_init(struct reftable_backend *be,
				 const char *path,
				 const struct reftable_write_options *_opts)
{
	struct reftable_write_options opts = *_opts;
	opts.on_reload = reftable_backend_on_reload;
	opts.on_reload_payload = be;
	return reftable_new_stack(&be->stack, path, &opts);
}

static void reftable_backend_release(struct reftable_backend *be)
{
	reftable_stack_destroy(be->stack);
	be->stack = NULL;
	reftable_iterator_destroy(&be->it);
}

static int reftable_backend_read_ref(struct reftable_backend *be,
				     const char *refname,
				     struct object_id *oid,
				     struct strbuf *referent,
				     unsigned int *type)
{
	struct reftable_ref_record ref = {0};
	int ret;

	if (!be->it.ops) {
		ret = reftable_stack_init_ref_iterator(be->stack, &be->it);
		if (ret)
			goto done;
	}

	ret = reftable_iterator_seek_ref(&be->it, refname);
	if (ret)
		goto done;

	ret = reftable_iterator_next_ref(&be->it, &ref);
	if (ret)
		goto done;

	if (strcmp(ref.refname, refname)) {
		ret = 1;
		goto done;
	}

	if (ref.value_type == REFTABLE_REF_SYMREF) {
		strbuf_reset(referent);
		strbuf_addstr(referent, ref.value.symref);
		*type |= REF_ISSYMREF;
	} else if (reftable_ref_record_val1(&ref)) {
		unsigned int hash_id;

		switch (reftable_stack_hash_id(be->stack)) {
		case REFTABLE_HASH_SHA1:
			hash_id = GIT_HASH_SHA1;
			break;
		case REFTABLE_HASH_SHA256:
			hash_id = GIT_HASH_SHA256;
			break;
		default:
			BUG("unhandled hash ID %d", reftable_stack_hash_id(be->stack));
		}

		oidread(oid, reftable_ref_record_val1(&ref),
			&hash_algos[hash_id]);
	} else {
		/* We got a tombstone, which should not happen. */
		BUG("unhandled reference value type %d", ref.value_type);
	}

done:
	assert(ret != REFTABLE_API_ERROR);
	reftable_ref_record_release(&ref);
	return ret;
}

struct reftable_ref_store {
	struct ref_store base;

	/*
	 * The main backend refers to the common dir and thus contains common
	 * refs as well as refs of the main repository.
	 */
	struct reftable_backend main_backend;
	/*
	 * The worktree backend refers to the gitdir in case the refdb is opened
	 * via a worktree. It thus contains the per-worktree refs.
	 */
	struct reftable_backend worktree_backend;
	/*
	 * Map of worktree backends by their respective worktree names. The map
	 * is populated lazily when we try to resolve `worktrees/$worktree` refs.
	 */
	struct strmap worktree_backends;
	struct reftable_write_options write_options;

	unsigned int store_flags;
	enum log_refs_config log_all_ref_updates;
	int err;
};

/*
 * Downcast ref_store to reftable_ref_store. Die if ref_store is not a
 * reftable_ref_store. required_flags is compared with ref_store's store_flags
 * to ensure the ref_store has all required capabilities. "caller" is used in
 * any necessary error messages.
 */
static struct reftable_ref_store *reftable_be_downcast(struct ref_store *ref_store,
						       unsigned int required_flags,
						       const char *caller)
{
	struct reftable_ref_store *refs;

	if (ref_store->be != &refs_be_reftable)
		BUG("ref_store is type \"%s\" not \"reftables\" in %s",
		    ref_store->be->name, caller);

	refs = (struct reftable_ref_store *)ref_store;

	if ((refs->store_flags & required_flags) != required_flags)
		BUG("operation %s requires abilities 0x%x, but only have 0x%x",
		    caller, required_flags, refs->store_flags);

	return refs;
}

/*
 * Some refs are global to the repository (refs/heads/{*}), while others are
 * local to the worktree (eg. HEAD, refs/bisect/{*}). We solve this by having
 * multiple separate databases (ie. multiple reftable/ directories), one for
 * the shared refs, one for the current worktree refs, and one for each
 * additional worktree. For reading, we merge the view of both the shared and
 * the current worktree's refs, when necessary.
 *
 * This function also optionally assigns the rewritten reference name that is
 * local to the stack. This translation is required when using worktree refs
 * like `worktrees/$worktree/refs/heads/foo` as worktree stacks will store
 * those references in their normalized form.
 */
static int backend_for(struct reftable_backend **out,
		       struct reftable_ref_store *store,
		       const char *refname,
		       const char **rewritten_ref,
		       int reload)
{
	struct reftable_backend *be;
	const char *wtname;
	int wtname_len;

	if (!refname) {
		be = &store->main_backend;
		goto out;
	}

	switch (parse_worktree_ref(refname, &wtname, &wtname_len, rewritten_ref)) {
	case REF_WORKTREE_OTHER: {
		static struct strbuf wtname_buf = STRBUF_INIT;
		struct strbuf wt_dir = STRBUF_INIT;

		/*
		 * We're using a static buffer here so that we don't need to
		 * allocate the worktree name whenever we look up a reference.
		 * This could be avoided if the strmap interface knew how to
		 * handle keys with a length.
		 */
		strbuf_reset(&wtname_buf);
		strbuf_add(&wtname_buf, wtname, wtname_len);

		/*
		 * There is an edge case here: when the worktree references the
		 * current worktree, then we set up the stack once via
		 * `worktree_backends` and once via `worktree_backend`. This is
		 * wasteful, but in the reading case it shouldn't matter. And
		 * in the writing case we would notice that the stack is locked
		 * already and error out when trying to write a reference via
		 * both stacks.
		 */
		be = strmap_get(&store->worktree_backends, wtname_buf.buf);
		if (!be) {
			strbuf_addf(&wt_dir, "%s/worktrees/%s/reftable",
				    store->base.repo->commondir, wtname_buf.buf);

			CALLOC_ARRAY(be, 1);
			store->err = reftable_backend_init(be, wt_dir.buf,
							   &store->write_options);
			assert(store->err != REFTABLE_API_ERROR);

			strmap_put(&store->worktree_backends, wtname_buf.buf, be);
		}

		strbuf_release(&wt_dir);
		goto out;
	}
	case REF_WORKTREE_CURRENT:
		/*
		 * If there is no worktree stack then we're currently in the
		 * main worktree. We thus return the main stack in that case.
		 */
		if (!store->worktree_backend.stack)
			be = &store->main_backend;
		else
			be = &store->worktree_backend;
		goto out;
	case REF_WORKTREE_MAIN:
	case REF_WORKTREE_SHARED:
		be = &store->main_backend;
		goto out;
	default:
		BUG("unhandled worktree reference type");
	}

out:
	if (reload) {
		int ret = reftable_stack_reload(be->stack);
		if (ret)
			return ret;
	}
	*out = be;

	return 0;
}

static int should_write_log(struct reftable_ref_store *refs, const char *refname)
{
	enum log_refs_config log_refs_cfg = refs->log_all_ref_updates;
	if (log_refs_cfg == LOG_REFS_UNSET)
		log_refs_cfg = is_bare_repository() ? LOG_REFS_NONE : LOG_REFS_NORMAL;

	switch (log_refs_cfg) {
	case LOG_REFS_NONE:
		return refs_reflog_exists(&refs->base, refname);
	case LOG_REFS_ALWAYS:
		return 1;
	case LOG_REFS_NORMAL:
		if (should_autocreate_reflog(log_refs_cfg, refname))
			return 1;
		return refs_reflog_exists(&refs->base, refname);
	default:
		BUG("unhandled core.logAllRefUpdates value %d", log_refs_cfg);
	}
}

static void fill_reftable_log_record(struct reftable_log_record *log, const struct ident_split *split)
{
	const char *tz_begin;
	int sign = 1;

	reftable_log_record_release(log);
	log->value_type = REFTABLE_LOG_UPDATE;
	log->value.update.name =
		xstrndup(split->name_begin, split->name_end - split->name_begin);
	log->value.update.email =
		xstrndup(split->mail_begin, split->mail_end - split->mail_begin);
	log->value.update.time = atol(split->date_begin);

	tz_begin = split->tz_begin;
	if (*tz_begin == '-') {
		sign = -1;
		tz_begin++;
	}
	if (*tz_begin == '+') {
		sign = 1;
		tz_begin++;
	}

	log->value.update.tz_offset = sign * atoi(tz_begin);
}

static int reftable_be_config(const char *var, const char *value,
			      const struct config_context *ctx,
			      void *_opts)
{
	struct reftable_write_options *opts = _opts;

	if (!strcmp(var, "reftable.blocksize")) {
		unsigned long block_size = git_config_ulong(var, value, ctx->kvi);
		if (block_size > 16777215)
			die("reftable block size cannot exceed 16MB");
		opts->block_size = block_size;
	} else if (!strcmp(var, "reftable.restartinterval")) {
		unsigned long restart_interval = git_config_ulong(var, value, ctx->kvi);
		if (restart_interval > UINT16_MAX)
			die("reftable block size cannot exceed %u", (unsigned)UINT16_MAX);
		opts->restart_interval = restart_interval;
	} else if (!strcmp(var, "reftable.indexobjects")) {
		opts->skip_index_objects = !git_config_bool(var, value);
	} else if (!strcmp(var, "reftable.geometricfactor")) {
		unsigned long factor = git_config_ulong(var, value, ctx->kvi);
		if (factor > UINT8_MAX)
			die("reftable geometric factor cannot exceed %u", (unsigned)UINT8_MAX);
		opts->auto_compaction_factor = factor;
	} else if (!strcmp(var, "reftable.locktimeout")) {
		int64_t lock_timeout = git_config_int64(var, value, ctx->kvi);
		if (lock_timeout > LONG_MAX)
			die("reftable lock timeout cannot exceed %"PRIdMAX, (intmax_t)LONG_MAX);
		if (lock_timeout < 0 && lock_timeout != -1)
			die("reftable lock timeout does not support negative values other than -1");
		opts->lock_timeout_ms = lock_timeout;
	}

	return 0;
}

static int reftable_be_fsync(int fd)
{
	return fsync_component(FSYNC_COMPONENT_REFERENCE, fd);
}

static struct ref_store *reftable_be_init(struct repository *repo,
					  const char *gitdir,
					  unsigned int store_flags)
{
	struct reftable_ref_store *refs = xcalloc(1, sizeof(*refs));
	struct strbuf path = STRBUF_INIT;
	int is_worktree;
	mode_t mask;

	mask = umask(0);
	umask(mask);

	base_ref_store_init(&refs->base, repo, gitdir, &refs_be_reftable);
	strmap_init(&refs->worktree_backends);
	refs->store_flags = store_flags;
	refs->log_all_ref_updates = repo_settings_get_log_all_ref_updates(repo);

	switch (repo->hash_algo->format_id) {
	case GIT_SHA1_FORMAT_ID:
		refs->write_options.hash_id = REFTABLE_HASH_SHA1;
		break;
	case GIT_SHA256_FORMAT_ID:
		refs->write_options.hash_id = REFTABLE_HASH_SHA256;
		break;
	default:
		BUG("unknown hash algorithm %d", repo->hash_algo->format_id);
	}
	refs->write_options.default_permissions = calc_shared_perm(the_repository, 0666 & ~mask);
	refs->write_options.disable_auto_compact =
		!git_env_bool("GIT_TEST_REFTABLE_AUTOCOMPACTION", 1);
	refs->write_options.lock_timeout_ms = 100;
	refs->write_options.fsync = reftable_be_fsync;

	repo_config(the_repository, reftable_be_config, &refs->write_options);

	/*
	 * It is somewhat unfortunate that we have to mirror the default block
	 * size of the reftable library here. But given that the write options
	 * wouldn't be updated by the library here, and given that we require
	 * the proper block size to trim reflog message so that they fit, we
	 * must set up a proper value here.
	 */
	if (!refs->write_options.block_size)
		refs->write_options.block_size = 4096;

	/*
	 * Set up the main reftable stack that is hosted in GIT_COMMON_DIR.
	 * This stack contains both the shared and the main worktree refs.
	 *
	 * Note that we don't try to resolve the path in case we have a
	 * worktree because `get_common_dir_noenv()` already does it for us.
	 */
	is_worktree = get_common_dir_noenv(&path, gitdir);
	if (!is_worktree) {
		strbuf_reset(&path);
		strbuf_realpath(&path, gitdir, 0);
	}
	strbuf_addstr(&path, "/reftable");
	refs->err = reftable_backend_init(&refs->main_backend, path.buf,
					  &refs->write_options);
	if (refs->err)
		goto done;

	/*
	 * If we're in a worktree we also need to set up the worktree reftable
	 * stack that is contained in the per-worktree GIT_DIR.
	 *
	 * Ideally, we would also add the stack to our worktree stack map. But
	 * we have no way to figure out the worktree name here and thus can't
	 * do it efficiently.
	 */
	if (is_worktree) {
		strbuf_reset(&path);
		strbuf_addf(&path, "%s/reftable", gitdir);

		refs->err = reftable_backend_init(&refs->worktree_backend, path.buf,
						  &refs->write_options);
		if (refs->err)
			goto done;
	}

	chdir_notify_reparent("reftables-backend $GIT_DIR", &refs->base.gitdir);

done:
	assert(refs->err != REFTABLE_API_ERROR);
	strbuf_release(&path);
	return &refs->base;
}

static void reftable_be_release(struct ref_store *ref_store)
{
	struct reftable_ref_store *refs = reftable_be_downcast(ref_store, 0, "release");
	struct strmap_entry *entry;
	struct hashmap_iter iter;

	if (refs->main_backend.stack)
		reftable_backend_release(&refs->main_backend);
	if (refs->worktree_backend.stack)
		reftable_backend_release(&refs->worktree_backend);

	strmap_for_each_entry(&refs->worktree_backends, &iter, entry) {
		struct reftable_backend *be = entry->value;
		reftable_backend_release(be);
		free(be);
	}
	strmap_clear(&refs->worktree_backends, 0);
}

static int reftable_be_create_on_disk(struct ref_store *ref_store,
				      int flags UNUSED,
				      struct strbuf *err UNUSED)
{
	struct reftable_ref_store *refs =
		reftable_be_downcast(ref_store, REF_STORE_WRITE, "create");
	struct strbuf sb = STRBUF_INIT;

	strbuf_addf(&sb, "%s/reftable", refs->base.gitdir);
	safe_create_dir(the_repository, sb.buf, 1);
	strbuf_reset(&sb);

	strbuf_addf(&sb, "%s/HEAD", refs->base.gitdir);
	write_file(sb.buf, "ref: refs/heads/.invalid");
	adjust_shared_perm(the_repository, sb.buf);
	strbuf_reset(&sb);

	strbuf_addf(&sb, "%s/refs", refs->base.gitdir);
	safe_create_dir(the_repository, sb.buf, 1);
	strbuf_reset(&sb);

	strbuf_addf(&sb, "%s/refs/heads", refs->base.gitdir);
	write_file(sb.buf, "this repository uses the reftable format");
	adjust_shared_perm(the_repository, sb.buf);

	strbuf_release(&sb);
	return 0;
}

static int reftable_be_remove_on_disk(struct ref_store *ref_store,
				      struct strbuf *err)
{
	struct reftable_ref_store *refs =
		reftable_be_downcast(ref_store, REF_STORE_WRITE, "remove");
	struct strbuf sb = STRBUF_INIT;
	int ret = 0;

	/*
	 * Release the ref store such that all stacks are closed. This is
	 * required so that the "tables.list" file is not open anymore, which
	 * would otherwise make it impossible to remove the file on Windows.
	 */
	reftable_be_release(ref_store);

	strbuf_addf(&sb, "%s/reftable", refs->base.gitdir);
	if (remove_dir_recursively(&sb, 0) < 0) {
		strbuf_addf(err, "could not delete reftables: %s",
			    strerror(errno));
		ret = -1;
	}
	strbuf_reset(&sb);

	strbuf_addf(&sb, "%s/HEAD", refs->base.gitdir);
	if (unlink(sb.buf) < 0) {
		strbuf_addf(err, "could not delete stub HEAD: %s",
			    strerror(errno));
		ret = -1;
	}
	strbuf_reset(&sb);

	strbuf_addf(&sb, "%s/refs/heads", refs->base.gitdir);
	if (unlink(sb.buf) < 0) {
		strbuf_addf(err, "could not delete stub heads: %s",
			    strerror(errno));
		ret = -1;
	}
	strbuf_reset(&sb);

	strbuf_addf(&sb, "%s/refs", refs->base.gitdir);
	if (rmdir(sb.buf) < 0) {
		strbuf_addf(err, "could not delete refs directory: %s",
			    strerror(errno));
		ret = -1;
	}

	strbuf_release(&sb);
	return ret;
}

struct reftable_ref_iterator {
	struct ref_iterator base;
	struct reftable_ref_store *refs;
	struct reftable_iterator iter;
	struct reftable_ref_record ref;
	struct object_id oid;

	char *prefix;
	size_t prefix_len;
	char **exclude_patterns;
	size_t exclude_patterns_index;
	size_t exclude_patterns_strlen;
	unsigned int flags;
	int err;
};

/*
 * Handle exclude patterns. Returns either `1`, which tells the caller that the
 * current reference shall not be shown. Or `0`, which indicates that it should
 * be shown.
 */
static int should_exclude_current_ref(struct reftable_ref_iterator *iter)
{
	while (iter->exclude_patterns[iter->exclude_patterns_index]) {
		const char *pattern = iter->exclude_patterns[iter->exclude_patterns_index];
		char *ref_after_pattern;
		int cmp;

		/*
		 * Lazily cache the pattern length so that we don't have to
		 * recompute it every time this function is called.
		 */
		if (!iter->exclude_patterns_strlen)
			iter->exclude_patterns_strlen = strlen(pattern);

		/*
		 * When the reference name is lexicographically bigger than the
		 * current exclude pattern we know that it won't ever match any
		 * of the following references, either. We thus advance to the
		 * next pattern and re-check whether it matches.
		 *
		 * Otherwise, if it's smaller, then we do not have a match and
		 * thus want to show the current reference.
		 */
		cmp = strncmp(iter->ref.refname, pattern,
			      iter->exclude_patterns_strlen);
		if (cmp > 0) {
			iter->exclude_patterns_index++;
			iter->exclude_patterns_strlen = 0;
			continue;
		}
		if (cmp < 0)
			return 0;

		/*
		 * The reference shares a prefix with the exclude pattern and
		 * shall thus be omitted. We skip all references that match the
		 * pattern by seeking to the first reference after the block of
		 * matches.
		 *
		 * This is done by appending the highest possible character to
		 * the pattern. Consequently, all references that have the
		 * pattern as prefix and whose suffix starts with anything in
		 * the range [0x00, 0xfe] are skipped. And given that 0xff is a
		 * non-printable character that shouldn't ever be in a ref name,
		 * we'd not yield any such record, either.
		 *
		 * Note that the seeked-to reference may also be excluded. This
		 * is not handled here though, but the caller is expected to
		 * loop and re-verify the next reference for us.
		 */
		ref_after_pattern = xstrfmt("%s%c", pattern, 0xff);
		iter->err = reftable_iterator_seek_ref(&iter->iter, ref_after_pattern);
		iter->exclude_patterns_index++;
		iter->exclude_patterns_strlen = 0;
		trace2_counter_add(TRACE2_COUNTER_ID_REFTABLE_RESEEKS, 1);

		free(ref_after_pattern);
		return 1;
	}

	return 0;
}

static int reftable_ref_iterator_advance(struct ref_iterator *ref_iterator)
{
	struct reftable_ref_iterator *iter =
		(struct reftable_ref_iterator *)ref_iterator;
	struct reftable_ref_store *refs = iter->refs;
	const char *referent = NULL;

	while (!iter->err) {
		int flags = 0;

		iter->err = reftable_iterator_next_ref(&iter->iter, &iter->ref);
		if (iter->err)
			break;

		/*
		 * The files backend only lists references contained in "refs/" unless
		 * the root refs are to be included. We emulate the same behaviour here.
		 */
		if (!starts_with(iter->ref.refname, "refs/") &&
		    !(iter->flags & DO_FOR_EACH_INCLUDE_ROOT_REFS &&
		      is_root_ref(iter->ref.refname))) {
			continue;
		}

		if (iter->prefix_len &&
		    strncmp(iter->prefix, iter->ref.refname, iter->prefix_len)) {
			iter->err = 1;
			break;
		}

		if (iter->exclude_patterns && should_exclude_current_ref(iter))
			continue;

		if (iter->flags & DO_FOR_EACH_PER_WORKTREE_ONLY &&
		    parse_worktree_ref(iter->ref.refname, NULL, NULL, NULL) !=
			    REF_WORKTREE_CURRENT)
			continue;

		switch (iter->ref.value_type) {
		case REFTABLE_REF_VAL1:
			oidread(&iter->oid, iter->ref.value.val1,
				refs->base.repo->hash_algo);
			break;
		case REFTABLE_REF_VAL2:
			oidread(&iter->oid, iter->ref.value.val2.value,
				refs->base.repo->hash_algo);
			break;
		case REFTABLE_REF_SYMREF:
			referent = refs_resolve_ref_unsafe(&iter->refs->base,
							   iter->ref.refname,
							   RESOLVE_REF_READING,
							   &iter->oid, &flags);
			if (!referent)
				oidclr(&iter->oid, refs->base.repo->hash_algo);
			break;
		default:
			BUG("unhandled reference value type %d", iter->ref.value_type);
		}

		if (is_null_oid(&iter->oid))
			flags |= REF_ISBROKEN;

		if (check_refname_format(iter->ref.refname, REFNAME_ALLOW_ONELEVEL)) {
			if (!refname_is_safe(iter->ref.refname))
				die(_("refname is dangerous: %s"), iter->ref.refname);
			oidclr(&iter->oid, refs->base.repo->hash_algo);
			flags |= REF_BAD_NAME | REF_ISBROKEN;
		}

		if (iter->flags & DO_FOR_EACH_OMIT_DANGLING_SYMREFS &&
		    flags & REF_ISSYMREF &&
		    flags & REF_ISBROKEN)
			continue;

		if (!(iter->flags & DO_FOR_EACH_INCLUDE_BROKEN) &&
		    !ref_resolves_to_object(iter->ref.refname, refs->base.repo,
					    &iter->oid, flags))
				continue;

		iter->base.refname = iter->ref.refname;
		iter->base.referent = referent;
		iter->base.oid = &iter->oid;
		iter->base.flags = flags;

		break;
	}

	if (iter->err > 0)
		return ITER_DONE;
	if (iter->err < 0)
		return ITER_ERROR;
	return ITER_OK;
}

static int reftable_ref_iterator_seek(struct ref_iterator *ref_iterator,
				      const char *refname, unsigned int flags)
{
	struct reftable_ref_iterator *iter =
		(struct reftable_ref_iterator *)ref_iterator;

	/* Unset any previously set prefix */
	FREE_AND_NULL(iter->prefix);
	iter->prefix_len = 0;

	if (flags & REF_ITERATOR_SEEK_SET_PREFIX) {
		iter->prefix = xstrdup_or_null(refname);
		iter->prefix_len = refname ? strlen(refname) : 0;
	}
	iter->err = reftable_iterator_seek_ref(&iter->iter, refname);

	return iter->err;
}

static int reftable_ref_iterator_peel(struct ref_iterator *ref_iterator,
				      struct object_id *peeled)
{
	struct reftable_ref_iterator *iter =
		(struct reftable_ref_iterator *)ref_iterator;

	if (iter->ref.value_type == REFTABLE_REF_VAL2) {
		oidread(peeled, iter->ref.value.val2.target_value,
			iter->refs->base.repo->hash_algo);
		return 0;
	}

	return -1;
}

static void reftable_ref_iterator_release(struct ref_iterator *ref_iterator)
{
	struct reftable_ref_iterator *iter =
		(struct reftable_ref_iterator *)ref_iterator;
	reftable_ref_record_release(&iter->ref);
	reftable_iterator_destroy(&iter->iter);
	if (iter->exclude_patterns) {
		for (size_t i = 0; iter->exclude_patterns[i]; i++)
			free(iter->exclude_patterns[i]);
		free(iter->exclude_patterns);
	}
	free(iter->prefix);
}

static struct ref_iterator_vtable reftable_ref_iterator_vtable = {
	.advance = reftable_ref_iterator_advance,
	.seek = reftable_ref_iterator_seek,
	.peel = reftable_ref_iterator_peel,
	.release = reftable_ref_iterator_release,
};

static int qsort_strcmp(const void *va, const void *vb)
{
	const char *a = *(const char **)va;
	const char *b = *(const char **)vb;
	return strcmp(a, b);
}

static char **filter_exclude_patterns(const char **exclude_patterns)
{
	size_t filtered_size = 0, filtered_alloc = 0;
	char **filtered = NULL;

	if (!exclude_patterns)
		return NULL;

	for (size_t i = 0; ; i++) {
		const char *exclude_pattern = exclude_patterns[i];
		int has_glob = 0;

		if (!exclude_pattern)
			break;

		for (const char *p = exclude_pattern; *p; p++) {
			has_glob = is_glob_special(*p);
			if (has_glob)
				break;
		}
		if (has_glob)
			continue;

		ALLOC_GROW(filtered, filtered_size + 1, filtered_alloc);
		filtered[filtered_size++] = xstrdup(exclude_pattern);
	}

	if (filtered_size) {
		QSORT(filtered, filtered_size, qsort_strcmp);
		ALLOC_GROW(filtered, filtered_size + 1, filtered_alloc);
		filtered[filtered_size++] = NULL;
	}

	return filtered;
}

static struct reftable_ref_iterator *ref_iterator_for_stack(struct reftable_ref_store *refs,
							    struct reftable_stack *stack,
							    const char *prefix,
							    const char **exclude_patterns,
							    int flags)
{
	struct reftable_ref_iterator *iter;
	int ret;

	iter = xcalloc(1, sizeof(*iter));
	base_ref_iterator_init(&iter->base, &reftable_ref_iterator_vtable);
	iter->base.oid = &iter->oid;
	iter->flags = flags;
	iter->refs = refs;
	iter->exclude_patterns = filter_exclude_patterns(exclude_patterns);

	ret = refs->err;
	if (ret)
		goto done;

	ret = reftable_stack_reload(stack);
	if (ret)
		goto done;

	ret = reftable_stack_init_ref_iterator(stack, &iter->iter);
	if (ret)
		goto done;

	ret = reftable_ref_iterator_seek(&iter->base, prefix,
					 REF_ITERATOR_SEEK_SET_PREFIX);
	if (ret)
		goto done;

done:
	iter->err = ret;
	return iter;
}

static struct ref_iterator *reftable_be_iterator_begin(struct ref_store *ref_store,
						       const char *prefix,
						       const char **exclude_patterns,
						       unsigned int flags)
{
	struct reftable_ref_iterator *main_iter, *worktree_iter;
	struct reftable_ref_store *refs;
	unsigned int required_flags = REF_STORE_READ;

	if (!(flags & DO_FOR_EACH_INCLUDE_BROKEN))
		required_flags |= REF_STORE_ODB;
	refs = reftable_be_downcast(ref_store, required_flags, "ref_iterator_begin");

	main_iter = ref_iterator_for_stack(refs, refs->main_backend.stack, prefix,
					   exclude_patterns, flags);

	/*
	 * The worktree stack is only set when we're in an actual worktree
	 * right now. If we aren't, then we return the common reftable
	 * iterator, only.
	 */
	if (!refs->worktree_backend.stack)
		return &main_iter->base;

	/*
	 * Otherwise we merge both the common and the per-worktree refs into a
	 * single iterator.
	 */
	worktree_iter = ref_iterator_for_stack(refs, refs->worktree_backend.stack, prefix,
					       exclude_patterns, flags);
	return merge_ref_iterator_begin(&worktree_iter->base, &main_iter->base,
					ref_iterator_select, NULL);
}

static int reftable_be_read_raw_ref(struct ref_store *ref_store,
				    const char *refname,
				    struct object_id *oid,
				    struct strbuf *referent,
				    unsigned int *type,
				    int *failure_errno)
{
	struct reftable_ref_store *refs =
		reftable_be_downcast(ref_store, REF_STORE_READ, "read_raw_ref");
	struct reftable_backend *be;
	int ret;

	if (refs->err < 0)
		return refs->err;

	ret = backend_for(&be, refs, refname, &refname, 1);
	if (ret)
		return ret;

	ret = reftable_backend_read_ref(be, refname, oid, referent, type);
	if (ret < 0)
		return ret;
	if (ret > 0) {
		*failure_errno = ENOENT;
		return -1;
	}

	return 0;
}

static int reftable_be_read_symbolic_ref(struct ref_store *ref_store,
					 const char *refname,
					 struct strbuf *referent)
{
	struct reftable_ref_store *refs =
		reftable_be_downcast(ref_store, REF_STORE_READ, "read_symbolic_ref");
	struct reftable_backend *be;
	struct object_id oid;
	unsigned int type = 0;
	int ret;

	ret = backend_for(&be, refs, refname, &refname, 1);
	if (ret)
		return ret;

	ret = reftable_backend_read_ref(be, refname, &oid, referent, &type);
	if (ret)
		ret = -1;
	else if (type == REF_ISSYMREF)
		; /* happy */
	else
		ret = NOT_A_SYMREF;
	return ret;
}

struct reftable_transaction_update {
	struct ref_update *update;
	struct object_id current_oid;
};

struct write_transaction_table_arg {
	struct reftable_ref_store *refs;
	struct reftable_backend *be;
	struct reftable_addition *addition;
	struct reftable_transaction_update *updates;
	size_t updates_nr;
	size_t updates_alloc;
	size_t updates_expected;
	uint64_t max_index;
};

struct reftable_transaction_data {
	struct write_transaction_table_arg *args;
	size_t args_nr, args_alloc;
};

static void free_transaction_data(struct reftable_transaction_data *tx_data)
{
	if (!tx_data)
		return;
	for (size_t i = 0; i < tx_data->args_nr; i++) {
		reftable_addition_destroy(tx_data->args[i].addition);
		free(tx_data->args[i].updates);
	}
	free(tx_data->args);
	free(tx_data);
}

/*
 * Prepare transaction update for the given reference update. This will cause
 * us to lock the corresponding reftable stack for concurrent modification.
 */
static int prepare_transaction_update(struct write_transaction_table_arg **out,
				      struct reftable_ref_store *refs,
				      struct reftable_transaction_data *tx_data,
				      struct ref_update *update,
				      struct strbuf *err)
{
	struct write_transaction_table_arg *arg = NULL;
	struct reftable_backend *be;
	size_t i;
	int ret;

	/*
	 * This function gets called in a loop, and we don't want to repeatedly
	 * reload the stack for every single ref update. Instead, we manually
	 * reload further down in the case where we haven't yet prepared the
	 * specific `reftable_backend`.
	 */
	ret = backend_for(&be, refs, update->refname, NULL, 0);
	if (ret)
		return ret;

	/*
	 * Search for a preexisting stack update. If there is one then we add
	 * the update to it, otherwise we set up a new stack update.
	 */
	for (i = 0; !arg && i < tx_data->args_nr; i++)
		if (tx_data->args[i].be == be)
			arg = &tx_data->args[i];

	if (!arg) {
		struct reftable_addition *addition;

		ret = reftable_stack_new_addition(&addition, be->stack,
						  REFTABLE_STACK_NEW_ADDITION_RELOAD);
		if (ret) {
			if (ret == REFTABLE_LOCK_ERROR)
				strbuf_addstr(err, "cannot lock references");
			return ret;
		}

		ALLOC_GROW(tx_data->args, tx_data->args_nr + 1,
			   tx_data->args_alloc);
		arg = &tx_data->args[tx_data->args_nr++];
		arg->refs = refs;
		arg->be = be;
		arg->addition = addition;
		arg->updates = NULL;
		arg->updates_nr = 0;
		arg->updates_alloc = 0;
		arg->updates_expected = 0;
		arg->max_index = 0;
	}

	arg->updates_expected++;

	if (out)
		*out = arg;

	return 0;
}

/*
 * Queue a reference update for the correct stack. We potentially need to
 * handle multiple stack updates in a single transaction when it spans across
 * multiple worktrees.
 */
static int queue_transaction_update(struct reftable_ref_store *refs,
				    struct reftable_transaction_data *tx_data,
				    struct ref_update *update,
				    struct object_id *current_oid,
				    struct strbuf *err)
{
	struct write_transaction_table_arg *arg = NULL;
	int ret;

	if (update->backend_data)
		BUG("reference update queued more than once");

	ret = prepare_transaction_update(&arg, refs, tx_data, update, err);
	if (ret < 0)
		return ret;

	ALLOC_GROW(arg->updates, arg->updates_nr + 1,
		   arg->updates_alloc);
	arg->updates[arg->updates_nr].update = update;
	oidcpy(&arg->updates[arg->updates_nr].current_oid, current_oid);
	update->backend_data = &arg->updates[arg->updates_nr++];

	return 0;
}

static enum ref_transaction_error prepare_single_update(struct reftable_ref_store *refs,
							struct reftable_transaction_data *tx_data,
							struct ref_transaction *transaction,
							struct reftable_backend *be,
							struct ref_update *u,
							size_t update_idx,
							struct string_list *refnames_to_check,
							unsigned int head_type,
							struct strbuf *head_referent,
							struct strbuf *referent,
							struct strbuf *err)
{
	enum ref_transaction_error ret = 0;
	struct object_id current_oid = {0};
	const char *rewritten_ref;

	/*
	 * There is no need to reload the respective backends here as
	 * we have already reloaded them when preparing the transaction
	 * update. And given that the stacks have been locked there
	 * shouldn't have been any concurrent modifications of the
	 * stack.
	 */
	ret = backend_for(&be, refs, u->refname, &rewritten_ref, 0);
	if (ret)
		return REF_TRANSACTION_ERROR_GENERIC;

	if (u->flags & REF_LOG_USE_PROVIDED_OIDS) {
		if (!(u->flags & REF_HAVE_OLD) ||
		    !(u->flags & REF_HAVE_NEW) ||
		    !(u->flags & REF_LOG_ONLY)) {
			strbuf_addf(err, _("trying to write reflog for '%s'"
					   "with incomplete values"), u->refname);
			return REF_TRANSACTION_ERROR_GENERIC;
		}

		if (queue_transaction_update(refs, tx_data, u, &u->old_oid, err))
			return REF_TRANSACTION_ERROR_GENERIC;
		return 0;
	}

	/* Verify that the new object ID is valid. */
	if ((u->flags & REF_HAVE_NEW) && !is_null_oid(&u->new_oid) &&
	    !(u->flags & REF_SKIP_OID_VERIFICATION) &&
	    !(u->flags & REF_LOG_ONLY)) {
		struct object *o = parse_object(refs->base.repo, &u->new_oid);
		if (!o) {
			strbuf_addf(err,
				    _("trying to write ref '%s' with nonexistent object %s"),
				    u->refname, oid_to_hex(&u->new_oid));
			return REF_TRANSACTION_ERROR_INVALID_NEW_VALUE;
		}

		if (o->type != OBJ_COMMIT && is_branch(u->refname)) {
			strbuf_addf(err, _("trying to write non-commit object %s to branch '%s'"),
				    oid_to_hex(&u->new_oid), u->refname);
			return REF_TRANSACTION_ERROR_INVALID_NEW_VALUE;
		}
	}

	/*
	 * When we update the reference that HEAD points to we enqueue
	 * a second log-only update for HEAD so that its reflog is
	 * updated accordingly.
	 */
	if (head_type == REF_ISSYMREF &&
	    !(u->flags & REF_LOG_ONLY) &&
	    !(u->flags & REF_UPDATE_VIA_HEAD) &&
	    !strcmp(rewritten_ref, head_referent->buf)) {
		/*
		 * First make sure that HEAD is not already in the
		 * transaction. This check is O(lg N) in the transaction
		 * size, but it happens at most once per transaction.
		 */
		if (string_list_has_string(&transaction->refnames, "HEAD")) {
			/* An entry already existed */
			strbuf_addf(err,
				    _("multiple updates for 'HEAD' (including one "
				      "via its referent '%s') are not allowed"),
				    u->refname);
			return REF_TRANSACTION_ERROR_NAME_CONFLICT;
		}

		ref_transaction_add_update(
			transaction, "HEAD",
			u->flags | REF_LOG_ONLY | REF_NO_DEREF,
			&u->new_oid, &u->old_oid, NULL, NULL, NULL,
			u->msg);
	}

	ret = reftable_backend_read_ref(be, rewritten_ref,
					&current_oid, referent, &u->type);
	if (ret < 0)
		return REF_TRANSACTION_ERROR_GENERIC;
	if (ret > 0 && !ref_update_expects_existing_old_ref(u)) {
		struct string_list_item *item;
		/*
		 * The reference does not exist, and we either have no
		 * old object ID or expect the reference to not exist.
		 * We can thus skip below safety checks as well as the
		 * symref splitting. But we do want to verify that
		 * there is no conflicting reference here so that we
		 * can output a proper error message instead of failing
		 * at a later point.
		 */
		item = string_list_append(refnames_to_check, u->refname);
		item->util = xmalloc(sizeof(update_idx));
		memcpy(item->util, &update_idx, sizeof(update_idx));

		/*
		 * There is no need to write the reference deletion
		 * when the reference in question doesn't exist.
		 */
		if ((u->flags & REF_HAVE_NEW) && !ref_update_has_null_new_value(u)) {
			ret = queue_transaction_update(refs, tx_data, u,
						       &current_oid, err);
			if (ret)
				return REF_TRANSACTION_ERROR_GENERIC;
		}

		return 0;
	}
	if (ret > 0) {
		/* The reference does not exist, but we expected it to. */
		strbuf_addf(err, _("cannot lock ref '%s': "
				   "unable to resolve reference '%s'"),
			    ref_update_original_update_refname(u), u->refname);
		return REF_TRANSACTION_ERROR_NONEXISTENT_REF;
	}

	if (u->type & REF_ISSYMREF) {
		/*
		 * The reftable stack is locked at this point already,
		 * so it is safe to call `refs_resolve_ref_unsafe()`
		 * here without causing races.
		 */
		const char *resolved = refs_resolve_ref_unsafe(&refs->base, u->refname, 0,
							       &current_oid, NULL);

		if (u->flags & REF_NO_DEREF) {
			if (u->flags & REF_HAVE_OLD && !resolved) {
				strbuf_addf(err, _("cannot lock ref '%s': "
						   "error reading reference"), u->refname);
				return REF_TRANSACTION_ERROR_GENERIC;
			}
		} else {
			struct ref_update *new_update;
			int new_flags;

			new_flags = u->flags;
			if (!strcmp(rewritten_ref, "HEAD"))
				new_flags |= REF_UPDATE_VIA_HEAD;

			if (string_list_has_string(&transaction->refnames, referent->buf)) {
				strbuf_addf(err,
					    _("multiple updates for '%s' (including one "
					      "via symref '%s') are not allowed"),
					    referent->buf, u->refname);
				return REF_TRANSACTION_ERROR_NAME_CONFLICT;
			}

			/*
			 * If we are updating a symref (eg. HEAD), we should also
			 * update the branch that the symref points to.
			 *
			 * This is generic functionality, and would be better
			 * done in refs.c, but the current implementation is
			 * intertwined with the locking in files-backend.c.
			 */
			new_update = ref_transaction_add_update(
				transaction, referent->buf, new_flags,
				u->new_target ? NULL : &u->new_oid,
				u->old_target ? NULL : &u->old_oid,
				u->new_target, u->old_target,
				u->committer_info, u->msg);

			new_update->parent_update = u;

			/* Change the symbolic ref update to log only. */
			u->flags |= REF_LOG_ONLY | REF_NO_DEREF;
		}
	}

	/*
	 * Verify that the old object matches our expectations. Note
	 * that the error messages here do not make a lot of sense in
	 * the context of the reftable backend as we never lock
	 * individual refs. But the error messages match what the files
	 * backend returns, which keeps our tests happy.
	 */
	if (u->old_target) {
		if (!(u->type & REF_ISSYMREF)) {
			strbuf_addf(err, _("cannot lock ref '%s': "
					   "expected symref with target '%s': "
					   "but is a regular ref"),
				    ref_update_original_update_refname(u),
				    u->old_target);
			return REF_TRANSACTION_ERROR_EXPECTED_SYMREF;
		}

		ret = ref_update_check_old_target(referent->buf, u, err);
		if (ret)
			return ret;
	} else if ((u->flags & (REF_LOG_ONLY | REF_HAVE_OLD)) == REF_HAVE_OLD) {
		if (oideq(&current_oid, &u->old_oid)) {
			/*
			 * Normally matching the expected old oid is enough. Either we
			 * found the ref at the expected state, or we are creating and
			 * expect the null oid (and likewise found nothing).
			 *
			 * But there is one exception for the null oid: if we found a
			 * symref pointing to nothing we'll also get the null oid. In
			 * regular recursive mode, that's good (we'll write to what the
			 * symref points to, which doesn't exist). But in no-deref
			 * mode, it means we'll clobber the symref, even though the
			 * caller asked for this to be a creation event. So flag
			 * that case to preserve the dangling symref.
			 *
			 * Everything else is OK and we can fall through to the
			 * end of the conditional chain.
			 */
			if ((u->flags & REF_NO_DEREF) &&
			    referent->len &&
			    is_null_oid(&u->old_oid)) {
				strbuf_addf(err, _("cannot lock ref '%s': "
					    "dangling symref already exists"),
					    ref_update_original_update_refname(u));
				return REF_TRANSACTION_ERROR_CREATE_EXISTS;
			}
		} else if (is_null_oid(&u->old_oid)) {
			strbuf_addf(err, _("cannot lock ref '%s': "
					   "reference already exists"),
				    ref_update_original_update_refname(u));
			return REF_TRANSACTION_ERROR_CREATE_EXISTS;
		} else if (is_null_oid(&current_oid)) {
			strbuf_addf(err, _("cannot lock ref '%s': "
					   "reference is missing but expected %s"),
				    ref_update_original_update_refname(u),
				    oid_to_hex(&u->old_oid));
			return REF_TRANSACTION_ERROR_NONEXISTENT_REF;
		} else {
			strbuf_addf(err, _("cannot lock ref '%s': "
					   "is at %s but expected %s"),
				    ref_update_original_update_refname(u),
				    oid_to_hex(&current_oid),
				    oid_to_hex(&u->old_oid));
			return REF_TRANSACTION_ERROR_INCORRECT_OLD_VALUE;
		}
	}

	/*
	 * If all of the following conditions are true:
	 *
	 *   - We're not about to write a symref.
	 *   - We're not about to write a log-only entry.
	 *   - Old and new object ID are different.
	 *
	 * Then we're essentially doing a no-op update that can be
	 * skipped. This is not only for the sake of efficiency, but
	 * also skips writing unneeded reflog entries.
	 */
	if ((u->type & REF_ISSYMREF) ||
	    (u->flags & REF_LOG_ONLY) ||
	    (u->flags & REF_HAVE_NEW && !oideq(&current_oid, &u->new_oid)))
		if (queue_transaction_update(refs, tx_data, u, &current_oid, err))
			return REF_TRANSACTION_ERROR_GENERIC;

	return 0;
}

static int reftable_be_transaction_prepare(struct ref_store *ref_store,
					   struct ref_transaction *transaction,
					   struct strbuf *err)
{
	struct reftable_ref_store *refs =
		reftable_be_downcast(ref_store, REF_STORE_WRITE|REF_STORE_MAIN, "ref_transaction_prepare");
	struct strbuf referent = STRBUF_INIT, head_referent = STRBUF_INIT;
	struct string_list refnames_to_check = STRING_LIST_INIT_NODUP;
	struct reftable_transaction_data *tx_data = NULL;
	struct reftable_backend *be;
	struct object_id head_oid;
	unsigned int head_type = 0;
	size_t i;
	int ret;

	ret = refs->err;
	if (ret < 0)
		goto done;

	tx_data = xcalloc(1, sizeof(*tx_data));

	/*
	 * Preprocess all updates. For one we check that there are no duplicate
	 * reference updates in this transaction. Second, we lock all stacks
	 * that will be modified during the transaction.
	 */
	for (i = 0; i < transaction->nr; i++) {
		ret = prepare_transaction_update(NULL, refs, tx_data,
						 transaction->updates[i], err);
		if (ret)
			goto done;
	}

	/*
	 * Now that we have counted updates per stack we can preallocate their
	 * arrays. This avoids having to reallocate many times.
	 */
	for (i = 0; i < tx_data->args_nr; i++) {
		CALLOC_ARRAY(tx_data->args[i].updates, tx_data->args[i].updates_expected);
		tx_data->args[i].updates_alloc = tx_data->args[i].updates_expected;
	}

	/*
	 * TODO: it's dubious whether we should reload the stack that "HEAD"
	 * belongs to or not. In theory, it may happen that we only modify
	 * stacks which are _not_ part of the "HEAD" stack. In that case we
	 * wouldn't have prepared any transaction for its stack and would not
	 * have reloaded it, which may mean that it is stale.
	 *
	 * On the other hand, reloading that stack without locking it feels
	 * wrong, too, as the value of "HEAD" could be modified concurrently at
	 * any point in time.
	 */
	ret = backend_for(&be, refs, "HEAD", NULL, 0);
	if (ret)
		goto done;

	ret = reftable_backend_read_ref(be, "HEAD", &head_oid,
					&head_referent, &head_type);
	if (ret < 0)
		goto done;
	ret = 0;

	for (i = 0; i < transaction->nr; i++) {
		ret = prepare_single_update(refs, tx_data, transaction, be,
					    transaction->updates[i], i,
					    &refnames_to_check, head_type,
					    &head_referent, &referent, err);
		if (ret) {
			if (ref_transaction_maybe_set_rejected(transaction, i, ret)) {
				strbuf_reset(err);
				ret = 0;

				continue;
			}
			goto done;
		}
	}

	ret = refs_verify_refnames_available(ref_store, &refnames_to_check,
					     &transaction->refnames, NULL,
					     transaction,
					     transaction->flags & REF_TRANSACTION_FLAG_INITIAL,
					     err);
	if (ret < 0)
		goto done;

	transaction->backend_data = tx_data;
	transaction->state = REF_TRANSACTION_PREPARED;

done:
	if (ret < 0) {
		free_transaction_data(tx_data);
		transaction->state = REF_TRANSACTION_CLOSED;
		if (!err->len)
			strbuf_addf(err, _("reftable: transaction prepare: %s"),
				    reftable_error_str(ret));
	}
	strbuf_release(&referent);
	strbuf_release(&head_referent);
	string_list_clear(&refnames_to_check, 1);

	return ret;
}

static int reftable_be_transaction_abort(struct ref_store *ref_store UNUSED,
					 struct ref_transaction *transaction,
					 struct strbuf *err UNUSED)
{
	struct reftable_transaction_data *tx_data = transaction->backend_data;
	free_transaction_data(tx_data);
	transaction->state = REF_TRANSACTION_CLOSED;
	return 0;
}

static int transaction_update_cmp(const void *a, const void *b)
{
	struct reftable_transaction_update *update_a = (struct reftable_transaction_update *)a;
	struct reftable_transaction_update *update_b = (struct reftable_transaction_update *)b;

	/*
	 * If there is an index set, it should take preference (default is 0).
	 * This ensures that updates with indexes are sorted amongst themselves.
	 */
	if (update_a->update->index || update_b->update->index)
		return update_a->update->index - update_b->update->index;

	return strcmp(update_a->update->refname, update_b->update->refname);
}

static int write_transaction_table(struct reftable_writer *writer, void *cb_data)
{
	struct write_transaction_table_arg *arg = cb_data;
	uint64_t ts = reftable_stack_next_update_index(arg->be->stack);
	struct reftable_log_record *logs = NULL;
	struct ident_split committer_ident = {0};
	size_t logs_nr = 0, logs_alloc = 0, i;
	const char *committer_info;
	int ret = 0;

	committer_info = git_committer_info(0);
	if (split_ident_line(&committer_ident, committer_info, strlen(committer_info)))
		BUG("failed splitting committer info");

	QSORT(arg->updates, arg->updates_nr, transaction_update_cmp);

	/*
	 * During reflog migration, we add indexes for a single reflog with
	 * multiple entries. Each entry will contain a different update_index,
	 * so set the limits accordingly.
	 */
	ret = reftable_writer_set_limits(writer, ts, ts + arg->max_index);
	if (ret < 0)
		goto done;

	for (i = 0; i < arg->updates_nr; i++) {
		struct reftable_transaction_update *tx_update = &arg->updates[i];
		struct ref_update *u = tx_update->update;

		if (u->rejection_err)
			continue;

		/*
		 * Write a reflog entry when updating a ref to point to
		 * something new in either of the following cases:
		 *
		 * - The reference is about to be deleted. We always want to
		 *   delete the reflog in that case.
		 * - REF_FORCE_CREATE_REFLOG is set, asking us to always create
		 *   the reflog entry.
		 * - `core.logAllRefUpdates` tells us to create the reflog for
		 *   the given ref.
		 */
		if ((u->flags & REF_HAVE_NEW) &&
		    !(u->type & REF_ISSYMREF) &&
		    ref_update_has_null_new_value(u)) {
			struct reftable_log_record log = {0};
			struct reftable_iterator it = {0};

			ret = reftable_stack_init_log_iterator(arg->be->stack, &it);
			if (ret < 0)
				goto done;

			/*
			 * When deleting refs we also delete all reflog entries
			 * with them. While it is not strictly required to
			 * delete reflogs together with their refs, this
			 * matches the behaviour of the files backend.
			 *
			 * Unfortunately, we have no better way than to delete
			 * all reflog entries one by one.
			 */
			ret = reftable_iterator_seek_log(&it, u->refname);
			while (ret == 0) {
				struct reftable_log_record *tombstone;

				ret = reftable_iterator_next_log(&it, &log);
				if (ret < 0)
					break;
				if (ret > 0 || strcmp(log.refname, u->refname)) {
					ret = 0;
					break;
				}

				ALLOC_GROW(logs, logs_nr + 1, logs_alloc);
				tombstone = &logs[logs_nr++];
				tombstone->refname = xstrdup(u->refname);
				tombstone->value_type = REFTABLE_LOG_DELETION;
				tombstone->update_index = log.update_index;
			}

			reftable_log_record_release(&log);
			reftable_iterator_destroy(&it);

			if (ret)
				goto done;
		} else if (!(u->flags & REF_SKIP_CREATE_REFLOG) &&
			   (u->flags & REF_HAVE_NEW) &&
			   (u->flags & REF_FORCE_CREATE_REFLOG ||
			    should_write_log(arg->refs, u->refname))) {
			struct reftable_log_record *log;
			int create_reflog = 1;

			if (u->new_target) {
				if (!refs_resolve_ref_unsafe(&arg->refs->base, u->new_target,
							     RESOLVE_REF_READING, &u->new_oid, NULL)) {
					/*
					 * TODO: currently we skip creating reflogs for dangling
					 * symref updates. It would be nice to capture this as
					 * zero oid updates however.
					 */
					create_reflog = 0;
				}
			}

			if (create_reflog) {
				struct ident_split c;

				ALLOC_GROW(logs, logs_nr + 1, logs_alloc);
				log = &logs[logs_nr++];
				memset(log, 0, sizeof(*log));

				if (u->committer_info) {
					if (split_ident_line(&c, u->committer_info,
							     strlen(u->committer_info)))
						BUG("failed splitting committer info");
				} else {
					c = committer_ident;
				}

				fill_reftable_log_record(log, &c);

				/*
				 * Updates are sorted by the writer. So updates for the same
				 * refname need to contain different update indices.
				 */
				log->update_index = ts + u->index;

				log->refname = xstrdup(u->refname);
				memcpy(log->value.update.new_hash,
				       u->new_oid.hash, GIT_MAX_RAWSZ);
				memcpy(log->value.update.old_hash,
				       tx_update->current_oid.hash, GIT_MAX_RAWSZ);
				log->value.update.message =
					xstrndup(u->msg, arg->refs->write_options.block_size / 2);
			}
		}

		if (u->flags & REF_LOG_ONLY)
			continue;

		if (u->new_target) {
			struct reftable_ref_record ref = {
				.refname = (char *)u->refname,
				.value_type = REFTABLE_REF_SYMREF,
				.value.symref = (char *)u->new_target,
				.update_index = ts,
			};

			ret = reftable_writer_add_ref(writer, &ref);
			if (ret < 0)
				goto done;
		} else if ((u->flags & REF_HAVE_NEW) && ref_update_has_null_new_value(u)) {
			struct reftable_ref_record ref = {
				.refname = (char *)u->refname,
				.update_index = ts,
				.value_type = REFTABLE_REF_DELETION,
			};

			ret = reftable_writer_add_ref(writer, &ref);
			if (ret < 0)
				goto done;
		} else if (u->flags & REF_HAVE_NEW) {
			struct reftable_ref_record ref = {0};
			struct object_id peeled;
			int peel_error;

			ref.refname = (char *)u->refname;
			ref.update_index = ts;

			peel_error = peel_object(arg->refs->base.repo, &u->new_oid, &peeled);
			if (!peel_error) {
				ref.value_type = REFTABLE_REF_VAL2;
				memcpy(ref.value.val2.target_value, peeled.hash, GIT_MAX_RAWSZ);
				memcpy(ref.value.val2.value, u->new_oid.hash, GIT_MAX_RAWSZ);
			} else if (!is_null_oid(&u->new_oid)) {
				ref.value_type = REFTABLE_REF_VAL1;
				memcpy(ref.value.val1, u->new_oid.hash, GIT_MAX_RAWSZ);
			}

			ret = reftable_writer_add_ref(writer, &ref);
			if (ret < 0)
				goto done;
		}
	}

	/*
	 * Logs are written at the end so that we do not have intermixed ref
	 * and log blocks.
	 */
	if (logs) {
		ret = reftable_writer_add_logs(writer, logs, logs_nr);
		if (ret < 0)
			goto done;
	}

done:
	assert(ret != REFTABLE_API_ERROR);
	for (i = 0; i < logs_nr; i++)
		reftable_log_record_release(&logs[i]);
	free(logs);
	return ret;
}

static int reftable_be_transaction_finish(struct ref_store *ref_store UNUSED,
					  struct ref_transaction *transaction,
					  struct strbuf *err)
{
	struct reftable_transaction_data *tx_data = transaction->backend_data;
	int ret = 0;

	for (size_t i = 0; i < tx_data->args_nr; i++) {
		tx_data->args[i].max_index = transaction->max_index;

		ret = reftable_addition_add(tx_data->args[i].addition,
					    write_transaction_table, &tx_data->args[i]);
		if (ret < 0)
			goto done;

		ret = reftable_addition_commit(tx_data->args[i].addition);
		if (ret < 0)
			goto done;
	}

done:
	assert(ret != REFTABLE_API_ERROR);
	free_transaction_data(tx_data);
	transaction->state = REF_TRANSACTION_CLOSED;

	if (ret) {
		strbuf_addf(err, _("reftable: transaction failure: %s"),
			    reftable_error_str(ret));
		return -1;
	}
	return ret;
}

static int reftable_be_pack_refs(struct ref_store *ref_store,
				 struct pack_refs_opts *opts)
{
	struct reftable_ref_store *refs =
		reftable_be_downcast(ref_store, REF_STORE_WRITE | REF_STORE_ODB, "pack_refs");
	struct reftable_stack *stack;
	int ret;

	if (refs->err)
		return refs->err;

	stack = refs->worktree_backend.stack;
	if (!stack)
		stack = refs->main_backend.stack;

	if (opts->flags & PACK_REFS_AUTO)
		ret = reftable_stack_auto_compact(stack);
	else
		ret = reftable_stack_compact_all(stack, NULL);
	if (ret < 0) {
		ret = error(_("unable to compact stack: %s"),
			    reftable_error_str(ret));
		goto out;
	}

	ret = reftable_stack_clean(stack);
	if (ret)
		goto out;

out:
	return ret;
}

static int reftable_be_optimize(struct ref_store *ref_store,
				struct pack_refs_opts *opts)
{
	return reftable_be_pack_refs(ref_store, opts);
}

struct write_create_symref_arg {
	struct reftable_ref_store *refs;
	struct reftable_stack *stack;
	struct strbuf *err;
	const char *refname;
	const char *target;
	const char *logmsg;
};

struct write_copy_arg {
	struct reftable_ref_store *refs;
	struct reftable_backend *be;
	const char *oldname;
	const char *newname;
	const char *logmsg;
	int delete_old;
};

static int write_copy_table(struct reftable_writer *writer, void *cb_data)
{
	struct write_copy_arg *arg = cb_data;
	uint64_t deletion_ts, creation_ts;
	struct reftable_ref_record old_ref = {0}, refs[2] = {0};
	struct reftable_log_record old_log = {0}, *logs = NULL;
	struct reftable_iterator it = {0};
	struct string_list skip = STRING_LIST_INIT_NODUP;
	struct ident_split committer_ident = {0};
	struct strbuf errbuf = STRBUF_INIT;
	size_t logs_nr = 0, logs_alloc = 0, i;
	const char *committer_info;
	int ret;

	committer_info = git_committer_info(0);
	if (split_ident_line(&committer_ident, committer_info, strlen(committer_info)))
		BUG("failed splitting committer info");

	if (reftable_stack_read_ref(arg->be->stack, arg->oldname, &old_ref)) {
		ret = error(_("refname %s not found"), arg->oldname);
		goto done;
	}
	if (old_ref.value_type == REFTABLE_REF_SYMREF) {
		ret = error(_("refname %s is a symbolic ref, copying it is not supported"),
			    arg->oldname);
		goto done;
	}

	/*
	 * There's nothing to do in case the old and new name are the same, so
	 * we exit early in that case.
	 */
	if (!strcmp(arg->oldname, arg->newname)) {
		ret = 0;
		goto done;
	}

	/*
	 * Verify that the new refname is available.
	 */
	if (arg->delete_old)
		string_list_insert(&skip, arg->oldname);
	ret = refs_verify_refname_available(&arg->refs->base, arg->newname,
					    NULL, &skip, 0, &errbuf);
	if (ret < 0) {
		error("%s", errbuf.buf);
		goto done;
	}

	/*
	 * When deleting the old reference we have to use two update indices:
	 * once to delete the old ref and its reflog, and once to create the
	 * new ref and its reflog. They need to be staged with two separate
	 * indices because the new reflog needs to encode both the deletion of
	 * the old branch and the creation of the new branch, and we cannot do
	 * two changes to a reflog in a single update.
	 */
	deletion_ts = creation_ts = reftable_stack_next_update_index(arg->be->stack);
	if (arg->delete_old)
		creation_ts++;
	ret = reftable_writer_set_limits(writer, deletion_ts, creation_ts);
	if (ret < 0)
		goto done;

	/*
	 * Add the new reference. If this is a rename then we also delete the
	 * old reference.
	 */
	refs[0] = old_ref;
	refs[0].refname = xstrdup(arg->newname);
	refs[0].update_index = creation_ts;
	if (arg->delete_old) {
		refs[1].refname = xstrdup(arg->oldname);
		refs[1].value_type = REFTABLE_REF_DELETION;
		refs[1].update_index = deletion_ts;
	}
	ret = reftable_writer_add_refs(writer, refs, arg->delete_old ? 2 : 1);
	if (ret < 0)
		goto done;

	/*
	 * When deleting the old branch we need to create a reflog entry on the
	 * new branch name that indicates that the old branch has been deleted
	 * and then recreated. This is a tad weird, but matches what the files
	 * backend does.
	 */
	if (arg->delete_old) {
		struct strbuf head_referent = STRBUF_INIT;
		struct object_id head_oid;
		int append_head_reflog;
		unsigned head_type = 0;

		ALLOC_GROW(logs, logs_nr + 1, logs_alloc);
		memset(&logs[logs_nr], 0, sizeof(logs[logs_nr]));
		fill_reftable_log_record(&logs[logs_nr], &committer_ident);
		logs[logs_nr].refname = xstrdup(arg->newname);
		logs[logs_nr].update_index = deletion_ts;
		logs[logs_nr].value.update.message =
			xstrndup(arg->logmsg, arg->refs->write_options.block_size / 2);
		memcpy(logs[logs_nr].value.update.old_hash, old_ref.value.val1, GIT_MAX_RAWSZ);
		logs_nr++;

		ret = reftable_backend_read_ref(arg->be, "HEAD", &head_oid,
						&head_referent, &head_type);
		if (ret < 0)
			goto done;
		append_head_reflog = (head_type & REF_ISSYMREF) && !strcmp(head_referent.buf, arg->oldname);
		strbuf_release(&head_referent);

		/*
		 * The files backend uses `refs_delete_ref()` to delete the old
		 * branch name, which will append a reflog entry for HEAD in
		 * case it points to the old branch.
		 */
		if (append_head_reflog) {
			ALLOC_GROW(logs, logs_nr + 1, logs_alloc);
			logs[logs_nr] = logs[logs_nr - 1];
			logs[logs_nr].refname = xstrdup("HEAD");
			logs[logs_nr].value.update.name =
				xstrdup(logs[logs_nr].value.update.name);
			logs[logs_nr].value.update.email =
				xstrdup(logs[logs_nr].value.update.email);
			logs[logs_nr].value.update.message =
				xstrdup(logs[logs_nr].value.update.message);
			logs_nr++;
		}
	}

	/*
	 * Create the reflog entry for the newly created branch.
	 */
	ALLOC_GROW(logs, logs_nr + 1, logs_alloc);
	memset(&logs[logs_nr], 0, sizeof(logs[logs_nr]));
	fill_reftable_log_record(&logs[logs_nr], &committer_ident);
	logs[logs_nr].refname = xstrdup(arg->newname);
	logs[logs_nr].update_index = creation_ts;
	logs[logs_nr].value.update.message =
		xstrndup(arg->logmsg, arg->refs->write_options.block_size / 2);
	memcpy(logs[logs_nr].value.update.new_hash, old_ref.value.val1, GIT_MAX_RAWSZ);
	logs_nr++;

	/*
	 * In addition to writing the reflog entry for the new branch, we also
	 * copy over all log entries from the old reflog. Last but not least,
	 * when renaming we also have to delete all the old reflog entries.
	 */
	ret = reftable_stack_init_log_iterator(arg->be->stack, &it);
	if (ret < 0)
		goto done;

	ret = reftable_iterator_seek_log(&it, arg->oldname);
	if (ret < 0)
		goto done;

	while (1) {
		ret = reftable_iterator_next_log(&it, &old_log);
		if (ret < 0)
			goto done;
		if (ret > 0 || strcmp(old_log.refname, arg->oldname)) {
			ret = 0;
			break;
		}

		free(old_log.refname);

		/*
		 * Copy over the old reflog entry with the new refname.
		 */
		ALLOC_GROW(logs, logs_nr + 1, logs_alloc);
		logs[logs_nr] = old_log;
		logs[logs_nr].refname = xstrdup(arg->newname);
		logs_nr++;

		/*
		 * Delete the old reflog entry in case we are renaming.
		 */
		if (arg->delete_old) {
			ALLOC_GROW(logs, logs_nr + 1, logs_alloc);
			memset(&logs[logs_nr], 0, sizeof(logs[logs_nr]));
			logs[logs_nr].refname = xstrdup(arg->oldname);
			logs[logs_nr].value_type = REFTABLE_LOG_DELETION;
			logs[logs_nr].update_index = old_log.update_index;
			logs_nr++;
		}

		/*
		 * Transfer ownership of the log record we're iterating over to
		 * the array of log records. Otherwise, the pointers would get
		 * free'd or reallocated by the iterator.
		 */
		memset(&old_log, 0, sizeof(old_log));
	}

	ret = reftable_writer_add_logs(writer, logs, logs_nr);
	if (ret < 0)
		goto done;

done:
	assert(ret != REFTABLE_API_ERROR);
	reftable_iterator_destroy(&it);
	string_list_clear(&skip, 0);
	strbuf_release(&errbuf);
	for (i = 0; i < logs_nr; i++)
		reftable_log_record_release(&logs[i]);
	free(logs);
	for (i = 0; i < ARRAY_SIZE(refs); i++)
		reftable_ref_record_release(&refs[i]);
	reftable_ref_record_release(&old_ref);
	reftable_log_record_release(&old_log);
	return ret;
}

static int reftable_be_rename_ref(struct ref_store *ref_store,
				  const char *oldrefname,
				  const char *newrefname,
				  const char *logmsg)
{
	struct reftable_ref_store *refs =
		reftable_be_downcast(ref_store, REF_STORE_WRITE, "rename_ref");
	struct write_copy_arg arg = {
		.refs = refs,
		.oldname = oldrefname,
		.newname = newrefname,
		.logmsg = logmsg,
		.delete_old = 1,
	};
	int ret;

	ret = refs->err;
	if (ret < 0)
		goto done;

	ret = backend_for(&arg.be, refs, newrefname, &newrefname, 1);
	if (ret)
		goto done;
	ret = reftable_stack_add(arg.be->stack, &write_copy_table, &arg,
				 REFTABLE_STACK_NEW_ADDITION_RELOAD);

done:
	assert(ret != REFTABLE_API_ERROR);
	return ret;
}

static int reftable_be_copy_ref(struct ref_store *ref_store,
				const char *oldrefname,
				const char *newrefname,
				const char *logmsg)
{
	struct reftable_ref_store *refs =
		reftable_be_downcast(ref_store, REF_STORE_WRITE, "copy_ref");
	struct write_copy_arg arg = {
		.refs = refs,
		.oldname = oldrefname,
		.newname = newrefname,
		.logmsg = logmsg,
	};
	int ret;

	ret = refs->err;
	if (ret < 0)
		goto done;

	ret = backend_for(&arg.be, refs, newrefname, &newrefname, 1);
	if (ret)
		goto done;
	ret = reftable_stack_add(arg.be->stack, &write_copy_table, &arg,
				 REFTABLE_STACK_NEW_ADDITION_RELOAD);

done:
	assert(ret != REFTABLE_API_ERROR);
	return ret;
}

struct reftable_reflog_iterator {
	struct ref_iterator base;
	struct reftable_ref_store *refs;
	struct reftable_iterator iter;
	struct reftable_log_record log;
	struct strbuf last_name;
	int err;
};

static int reftable_reflog_iterator_advance(struct ref_iterator *ref_iterator)
{
	struct reftable_reflog_iterator *iter =
		(struct reftable_reflog_iterator *)ref_iterator;

	while (!iter->err) {
		iter->err = reftable_iterator_next_log(&iter->iter, &iter->log);
		if (iter->err)
			break;

		/*
		 * We want the refnames that we have reflogs for, so we skip if
		 * we've already produced this name. This could be faster by
		 * seeking directly to reflog@update_index==0.
		 */
		if (!strcmp(iter->log.refname, iter->last_name.buf))
			continue;

		if (check_refname_format(iter->log.refname,
					 REFNAME_ALLOW_ONELEVEL))
			continue;

		strbuf_reset(&iter->last_name);
		strbuf_addstr(&iter->last_name, iter->log.refname);
		iter->base.refname = iter->log.refname;

		break;
	}

	if (iter->err > 0)
		return ITER_DONE;
	if (iter->err < 0)
		return ITER_ERROR;
	return ITER_OK;
}

static int reftable_reflog_iterator_seek(struct ref_iterator *ref_iterator UNUSED,
					 const char *refname UNUSED,
					 unsigned int flags UNUSED)
{
	BUG("reftable reflog iterator cannot be seeked");
	return -1;
}

static int reftable_reflog_iterator_peel(struct ref_iterator *ref_iterator UNUSED,
					 struct object_id *peeled UNUSED)
{
	BUG("reftable reflog iterator cannot be peeled");
	return -1;
}

static void reftable_reflog_iterator_release(struct ref_iterator *ref_iterator)
{
	struct reftable_reflog_iterator *iter =
		(struct reftable_reflog_iterator *)ref_iterator;
	reftable_log_record_release(&iter->log);
	reftable_iterator_destroy(&iter->iter);
	strbuf_release(&iter->last_name);
}

static struct ref_iterator_vtable reftable_reflog_iterator_vtable = {
	.advance = reftable_reflog_iterator_advance,
	.seek = reftable_reflog_iterator_seek,
	.peel = reftable_reflog_iterator_peel,
	.release = reftable_reflog_iterator_release,
};

static struct reftable_reflog_iterator *reflog_iterator_for_stack(struct reftable_ref_store *refs,
								  struct reftable_stack *stack)
{
	struct reftable_reflog_iterator *iter;
	int ret;

	iter = xcalloc(1, sizeof(*iter));
	base_ref_iterator_init(&iter->base, &reftable_reflog_iterator_vtable);
	strbuf_init(&iter->last_name, 0);
	iter->refs = refs;

	ret = refs->err;
	if (ret)
		goto done;

	ret = reftable_stack_reload(stack);
	if (ret < 0)
		goto done;

	ret = reftable_stack_init_log_iterator(stack, &iter->iter);
	if (ret < 0)
		goto done;

	ret = reftable_iterator_seek_log(&iter->iter, "");
	if (ret < 0)
		goto done;

done:
	iter->err = ret;
	return iter;
}

static struct ref_iterator *reftable_be_reflog_iterator_begin(struct ref_store *ref_store)
{
	struct reftable_ref_store *refs =
		reftable_be_downcast(ref_store, REF_STORE_READ, "reflog_iterator_begin");
	struct reftable_reflog_iterator *main_iter, *worktree_iter;

	main_iter = reflog_iterator_for_stack(refs, refs->main_backend.stack);
	if (!refs->worktree_backend.stack)
		return &main_iter->base;

	worktree_iter = reflog_iterator_for_stack(refs, refs->worktree_backend.stack);

	return merge_ref_iterator_begin(&worktree_iter->base, &main_iter->base,
					ref_iterator_select, NULL);
}

static int yield_log_record(struct reftable_ref_store *refs,
			    struct reftable_log_record *log,
			    each_reflog_ent_fn fn,
			    void *cb_data)
{
	struct object_id old_oid, new_oid;
	const char *full_committer;

	oidread(&old_oid, log->value.update.old_hash, refs->base.repo->hash_algo);
	oidread(&new_oid, log->value.update.new_hash, refs->base.repo->hash_algo);

	/*
	 * When both the old object ID and the new object ID are null
	 * then this is the reflog existence marker. The caller must
	 * not be aware of it.
	 */
	if (is_null_oid(&old_oid) && is_null_oid(&new_oid))
		return 0;

	full_committer = fmt_ident(log->value.update.name, log->value.update.email,
				   WANT_COMMITTER_IDENT, NULL, IDENT_NO_DATE);
	return fn(log->refname, &old_oid, &new_oid, full_committer,
		  log->value.update.time, log->value.update.tz_offset,
		  log->value.update.message, cb_data);
}

static int reftable_be_for_each_reflog_ent_reverse(struct ref_store *ref_store,
						   const char *refname,
						   each_reflog_ent_fn fn,
						   void *cb_data)
{
	struct reftable_ref_store *refs =
		reftable_be_downcast(ref_store, REF_STORE_READ, "for_each_reflog_ent_reverse");
	struct reftable_log_record log = {0};
	struct reftable_iterator it = {0};
	struct reftable_backend *be;
	int ret;

	if (refs->err < 0)
		return refs->err;

	/*
	 * TODO: we should adapt this callsite to reload the stack. There is no
	 * obvious reason why we shouldn't.
	 */
	ret = backend_for(&be, refs, refname, &refname, 0);
	if (ret)
		goto done;

	ret = reftable_stack_init_log_iterator(be->stack, &it);
	if (ret < 0)
		goto done;

	ret = reftable_iterator_seek_log(&it, refname);
	while (!ret) {
		ret = reftable_iterator_next_log(&it, &log);
		if (ret < 0)
			break;
		if (ret > 0 || strcmp(log.refname, refname)) {
			ret = 0;
			break;
		}

		ret = yield_log_record(refs, &log, fn, cb_data);
		if (ret)
			break;
	}

done:
	reftable_log_record_release(&log);
	reftable_iterator_destroy(&it);
	return ret;
}

static int reftable_be_for_each_reflog_ent(struct ref_store *ref_store,
					   const char *refname,
					   each_reflog_ent_fn fn,
					   void *cb_data)
{
	struct reftable_ref_store *refs =
		reftable_be_downcast(ref_store, REF_STORE_READ, "for_each_reflog_ent");
	struct reftable_log_record *logs = NULL;
	struct reftable_iterator it = {0};
	struct reftable_backend *be;
	size_t logs_alloc = 0, logs_nr = 0, i;
	int ret;

	if (refs->err < 0)
		return refs->err;

	/*
	 * TODO: we should adapt this callsite to reload the stack. There is no
	 * obvious reason why we shouldn't.
	 */
	ret = backend_for(&be, refs, refname, &refname, 0);
	if (ret)
		goto done;

	ret = reftable_stack_init_log_iterator(be->stack, &it);
	if (ret < 0)
		goto done;

	ret = reftable_iterator_seek_log(&it, refname);
	while (!ret) {
		struct reftable_log_record log = {0};

		ret = reftable_iterator_next_log(&it, &log);
		if (ret < 0)
			goto done;
		if (ret > 0 || strcmp(log.refname, refname)) {
			reftable_log_record_release(&log);
			ret = 0;
			break;
		}

		ALLOC_GROW(logs, logs_nr + 1, logs_alloc);
		logs[logs_nr++] = log;
	}

	for (i = logs_nr; i--;) {
		ret = yield_log_record(refs, &logs[i], fn, cb_data);
		if (ret)
			goto done;
	}

done:
	reftable_iterator_destroy(&it);
	for (i = 0; i < logs_nr; i++)
		reftable_log_record_release(&logs[i]);
	free(logs);
	return ret;
}

static int reftable_be_reflog_exists(struct ref_store *ref_store,
				     const char *refname)
{
	struct reftable_ref_store *refs =
		reftable_be_downcast(ref_store, REF_STORE_READ, "reflog_exists");
	struct reftable_log_record log = {0};
	struct reftable_iterator it = {0};
	struct reftable_backend *be;
	int ret;

	ret = refs->err;
	if (ret < 0)
		goto done;

	ret = backend_for(&be, refs, refname, &refname, 1);
	if (ret < 0)
		goto done;

	ret = reftable_stack_init_log_iterator(be->stack, &it);
	if (ret < 0)
		goto done;

	ret = reftable_iterator_seek_log(&it, refname);
	if (ret < 0)
		goto done;

	/*
	 * Check whether we get at least one log record for the given ref name.
	 * If so, the reflog exists, otherwise it doesn't.
	 */
	ret = reftable_iterator_next_log(&it, &log);
	if (ret < 0)
		goto done;
	if (ret > 0) {
		ret = 0;
		goto done;
	}

	ret = strcmp(log.refname, refname) == 0;

done:
	reftable_iterator_destroy(&it);
	reftable_log_record_release(&log);
	if (ret < 0)
		ret = 0;
	return ret;
}

struct write_reflog_existence_arg {
	struct reftable_ref_store *refs;
	const char *refname;
	struct reftable_stack *stack;
};

static int write_reflog_existence_table(struct reftable_writer *writer,
					void *cb_data)
{
	struct write_reflog_existence_arg *arg = cb_data;
	uint64_t ts = reftable_stack_next_update_index(arg->stack);
	struct reftable_log_record log = {0};
	int ret;

	ret = reftable_stack_read_log(arg->stack, arg->refname, &log);
	if (ret <= 0)
		goto done;

	ret = reftable_writer_set_limits(writer, ts, ts);
	if (ret < 0)
		goto done;

	/*
	 * The existence entry has both old and new object ID set to the
	 * null object ID. Our iterators are aware of this and will not present
	 * them to their callers.
	 */
	log.refname = xstrdup(arg->refname);
	log.update_index = ts;
	log.value_type = REFTABLE_LOG_UPDATE;
	ret = reftable_writer_add_log(writer, &log);

done:
	assert(ret != REFTABLE_API_ERROR);
	reftable_log_record_release(&log);
	return ret;
}

static int reftable_be_create_reflog(struct ref_store *ref_store,
				     const char *refname,
				     struct strbuf *errmsg UNUSED)
{
	struct reftable_ref_store *refs =
		reftable_be_downcast(ref_store, REF_STORE_WRITE, "create_reflog");
	struct reftable_backend *be;
	struct write_reflog_existence_arg arg = {
		.refs = refs,
		.refname = refname,
	};
	int ret;

	ret = refs->err;
	if (ret < 0)
		goto done;

	ret = backend_for(&be, refs, refname, &refname, 1);
	if (ret)
		goto done;
	arg.stack = be->stack;

	ret = reftable_stack_add(be->stack, &write_reflog_existence_table, &arg,
				 REFTABLE_STACK_NEW_ADDITION_RELOAD);

done:
	return ret;
}

struct write_reflog_delete_arg {
	struct reftable_stack *stack;
	const char *refname;
};

static int write_reflog_delete_table(struct reftable_writer *writer, void *cb_data)
{
	struct write_reflog_delete_arg *arg = cb_data;
	struct reftable_log_record log = {0}, tombstone = {0};
	struct reftable_iterator it = {0};
	uint64_t ts = reftable_stack_next_update_index(arg->stack);
	int ret;

	ret = reftable_writer_set_limits(writer, ts, ts);
	if (ret < 0)
		goto out;

	ret = reftable_stack_init_log_iterator(arg->stack, &it);
	if (ret < 0)
		goto out;

	/*
	 * In order to delete a table we need to delete all reflog entries one
	 * by one. This is inefficient, but the reftable format does not have a
	 * better marker right now.
	 */
	ret = reftable_iterator_seek_log(&it, arg->refname);
	while (ret == 0) {
		ret = reftable_iterator_next_log(&it, &log);
		if (ret < 0)
			break;
		if (ret > 0 || strcmp(log.refname, arg->refname)) {
			ret = 0;
			break;
		}

		tombstone.refname = (char *)arg->refname;
		tombstone.value_type = REFTABLE_LOG_DELETION;
		tombstone.update_index = log.update_index;

		ret = reftable_writer_add_log(writer, &tombstone);
	}

out:
	reftable_log_record_release(&log);
	reftable_iterator_destroy(&it);
	return ret;
}

static int reftable_be_delete_reflog(struct ref_store *ref_store,
				     const char *refname)
{
	struct reftable_ref_store *refs =
		reftable_be_downcast(ref_store, REF_STORE_WRITE, "delete_reflog");
	struct reftable_backend *be;
	struct write_reflog_delete_arg arg = {
		.refname = refname,
	};
	int ret;

	ret = backend_for(&be, refs, refname, &refname, 1);
	if (ret)
		return ret;
	arg.stack = be->stack;

	ret = reftable_stack_add(be->stack, &write_reflog_delete_table, &arg,
				 REFTABLE_STACK_NEW_ADDITION_RELOAD);

	assert(ret != REFTABLE_API_ERROR);
	return ret;
}

struct reflog_expiry_arg {
	struct reftable_ref_store *refs;
	struct reftable_stack *stack;
	struct reftable_log_record *records;
	struct object_id update_oid;
	const char *refname;
	size_t len;
};

static int write_reflog_expiry_table(struct reftable_writer *writer, void *cb_data)
{
	struct reflog_expiry_arg *arg = cb_data;
	uint64_t ts = reftable_stack_next_update_index(arg->stack);
	uint64_t live_records = 0;
	size_t i;
	int ret;

	for (i = 0; i < arg->len; i++)
		if (arg->records[i].value_type == REFTABLE_LOG_UPDATE)
			live_records++;

	ret = reftable_writer_set_limits(writer, ts, ts);
	if (ret < 0)
		return ret;

	if (!is_null_oid(&arg->update_oid)) {
		struct reftable_ref_record ref = {0};
		struct object_id peeled;

		ref.refname = (char *)arg->refname;
		ref.update_index = ts;

		if (!peel_object(arg->refs->base.repo, &arg->update_oid, &peeled)) {
			ref.value_type = REFTABLE_REF_VAL2;
			memcpy(ref.value.val2.target_value, peeled.hash, GIT_MAX_RAWSZ);
			memcpy(ref.value.val2.value, arg->update_oid.hash, GIT_MAX_RAWSZ);
		} else {
			ref.value_type = REFTABLE_REF_VAL1;
			memcpy(ref.value.val1, arg->update_oid.hash, GIT_MAX_RAWSZ);
		}

		ret = reftable_writer_add_ref(writer, &ref);
		if (ret < 0)
			return ret;
	}

	/*
	 * When there are no more entries left in the reflog we empty it
	 * completely, but write a placeholder reflog entry that indicates that
	 * the reflog still exists.
	 */
	if (!live_records) {
		struct reftable_log_record log = {
			.refname = (char *)arg->refname,
			.value_type = REFTABLE_LOG_UPDATE,
			.update_index = ts,
		};

		ret = reftable_writer_add_log(writer, &log);
		if (ret)
			return ret;
	}

	for (i = 0; i < arg->len; i++) {
		ret = reftable_writer_add_log(writer, &arg->records[i]);
		if (ret)
			return ret;
	}

	return 0;
}

static int reftable_be_reflog_expire(struct ref_store *ref_store,
				     const char *refname,
				     unsigned int flags,
				     reflog_expiry_prepare_fn prepare_fn,
				     reflog_expiry_should_prune_fn should_prune_fn,
				     reflog_expiry_cleanup_fn cleanup_fn,
				     void *policy_cb_data)
{
	/*
	 * For log expiry, we write tombstones for every single reflog entry
	 * that is to be expired. This means that the entries are still
	 * retrievable by delving into the stack, and expiring entries
	 * paradoxically takes extra memory. This memory is only reclaimed when
	 * compacting the reftable stack.
	 *
	 * It would be better if the refs backend supported an API that sets a
	 * criterion for all refs, passing the criterion to pack_refs().
	 *
	 * On the plus side, because we do the expiration per ref, we can easily
	 * insert the reflog existence dummies.
	 */
	struct reftable_ref_store *refs =
		reftable_be_downcast(ref_store, REF_STORE_WRITE, "reflog_expire");
	struct reftable_log_record *logs = NULL;
	struct reftable_log_record *rewritten = NULL;
	struct reftable_iterator it = {0};
	struct reftable_addition *add = NULL;
	struct reflog_expiry_arg arg = {0};
	struct reftable_backend *be;
	struct object_id oid = {0};
	struct strbuf referent = STRBUF_INIT;
	uint8_t *last_hash = NULL;
	size_t logs_nr = 0, logs_alloc = 0, i;
	unsigned int type = 0;
	int ret;

	if (refs->err < 0)
		return refs->err;

	ret = backend_for(&be, refs, refname, &refname, 1);
	if (ret < 0)
		goto done;

	ret = reftable_stack_new_addition(&add, be->stack,
					  REFTABLE_STACK_NEW_ADDITION_RELOAD);
	if (ret < 0)
		goto done;

	ret = reftable_stack_init_log_iterator(be->stack, &it);
	if (ret < 0)
		goto done;

	ret = reftable_iterator_seek_log(&it, refname);
	if (ret < 0)
		goto done;

	ret = reftable_backend_read_ref(be, refname, &oid, &referent, &type);
	if (ret < 0)
		goto done;
	prepare_fn(refname, &oid, policy_cb_data);

	while (1) {
		struct reftable_log_record log = {0};
		struct object_id old_oid, new_oid;

		ret = reftable_iterator_next_log(&it, &log);
		if (ret < 0)
			goto done;
		if (ret > 0 || strcmp(log.refname, refname)) {
			reftable_log_record_release(&log);
			break;
		}

		oidread(&old_oid, log.value.update.old_hash,
			ref_store->repo->hash_algo);
		oidread(&new_oid, log.value.update.new_hash,
			ref_store->repo->hash_algo);

		/*
		 * Skip over the reflog existence marker. We will add it back
		 * in when there are no live reflog records.
		 */
		if (is_null_oid(&old_oid) && is_null_oid(&new_oid)) {
			reftable_log_record_release(&log);
			continue;
		}

		ALLOC_GROW(logs, logs_nr + 1, logs_alloc);
		logs[logs_nr++] = log;
	}

	/*
	 * We need to rewrite all reflog entries according to the pruning
	 * callback function:
	 *
	 *   - If a reflog entry shall be pruned we mark the record for
	 *     deletion.
	 *
	 *   - Otherwise we may have to rewrite the chain of reflog entries so
	 *     that gaps created by just-deleted records get backfilled.
	 */
	CALLOC_ARRAY(rewritten, logs_nr);
	for (i = logs_nr; i--;) {
		struct reftable_log_record *dest = &rewritten[i];
		struct object_id old_oid, new_oid;

		*dest = logs[i];
		oidread(&old_oid, logs[i].value.update.old_hash,
			ref_store->repo->hash_algo);
		oidread(&new_oid, logs[i].value.update.new_hash,
			ref_store->repo->hash_algo);

		if (should_prune_fn(&old_oid, &new_oid, logs[i].value.update.email,
				    (timestamp_t)logs[i].value.update.time,
				    logs[i].value.update.tz_offset,
				    logs[i].value.update.message,
				    policy_cb_data)) {
			dest->value_type = REFTABLE_LOG_DELETION;
		} else {
			if ((flags & EXPIRE_REFLOGS_REWRITE) && last_hash)
				memcpy(dest->value.update.old_hash, last_hash, GIT_MAX_RAWSZ);
			last_hash = logs[i].value.update.new_hash;
		}
	}

	if (flags & EXPIRE_REFLOGS_UPDATE_REF && last_hash && !is_null_oid(&oid))
		oidread(&arg.update_oid, last_hash, ref_store->repo->hash_algo);

	arg.refs = refs;
	arg.records = rewritten;
	arg.len = logs_nr;
	arg.stack = be->stack;
	arg.refname = refname;

	ret = reftable_addition_add(add, &write_reflog_expiry_table, &arg);
	if (ret < 0)
		goto done;

	/*
	 * Future improvement: we could skip writing records that were
	 * not changed.
	 */
	if (!(flags & EXPIRE_REFLOGS_DRY_RUN))
		ret = reftable_addition_commit(add);

done:
	if (add)
		cleanup_fn(policy_cb_data);
	assert(ret != REFTABLE_API_ERROR);

	reftable_iterator_destroy(&it);
	reftable_addition_destroy(add);
	for (i = 0; i < logs_nr; i++)
		reftable_log_record_release(&logs[i]);
	strbuf_release(&referent);
	free(logs);
	free(rewritten);
	return ret;
}

static void reftable_fsck_verbose_handler(const char *msg, void *cb_data)
{
	struct fsck_options *o = cb_data;

	if (o->verbose)
		fprintf_ln(stderr, "%s", msg);
}

static const enum fsck_msg_id fsck_msg_id_map[] = {
	[REFTABLE_FSCK_ERROR_TABLE_NAME] = FSCK_MSG_BAD_REFTABLE_TABLE_NAME,
};

static int reftable_fsck_error_handler(struct reftable_fsck_info *info,
				       void *cb_data)
{
	struct fsck_ref_report report = { .path = info->path };
	struct fsck_options *o = cb_data;
	enum fsck_msg_id msg_id;

	if (info->error < 0 || info->error >= REFTABLE_FSCK_MAX_VALUE)
		BUG("unknown fsck error: %d", (int)info->error);

	msg_id = fsck_msg_id_map[info->error];

	if (!msg_id)
		BUG("fsck_msg_id value missing for reftable error: %d", (int)info->error);

	return fsck_report_ref(o, &report, msg_id, "%s", info->msg);
}

static int reftable_be_fsck(struct ref_store *ref_store, struct fsck_options *o,
			    struct worktree *wt UNUSED)
{
	struct reftable_ref_store *refs;
	struct strmap_entry *entry;
	struct hashmap_iter iter;
	int ret = 0;

	refs = reftable_be_downcast(ref_store, REF_STORE_READ, "fsck");

	ret |= reftable_fsck_check(refs->main_backend.stack, reftable_fsck_error_handler,
				   reftable_fsck_verbose_handler, o);

	strmap_for_each_entry(&refs->worktree_backends, &iter, entry) {
		struct reftable_backend *b = (struct reftable_backend *)entry->value;
		ret |= reftable_fsck_check(b->stack, reftable_fsck_error_handler,
					   reftable_fsck_verbose_handler, o);
	}

	return ret;
}

struct ref_storage_be refs_be_reftable = {
	.name = "reftable",
	.init = reftable_be_init,
	.release = reftable_be_release,
	.create_on_disk = reftable_be_create_on_disk,
	.remove_on_disk = reftable_be_remove_on_disk,

	.transaction_prepare = reftable_be_transaction_prepare,
	.transaction_finish = reftable_be_transaction_finish,
	.transaction_abort = reftable_be_transaction_abort,

	.pack_refs = reftable_be_pack_refs,
	.optimize = reftable_be_optimize,
	.rename_ref = reftable_be_rename_ref,
	.copy_ref = reftable_be_copy_ref,

	.iterator_begin = reftable_be_iterator_begin,
	.read_raw_ref = reftable_be_read_raw_ref,
	.read_symbolic_ref = reftable_be_read_symbolic_ref,

	.reflog_iterator_begin = reftable_be_reflog_iterator_begin,
	.for_each_reflog_ent = reftable_be_for_each_reflog_ent,
	.for_each_reflog_ent_reverse = reftable_be_for_each_reflog_ent_reverse,
	.reflog_exists = reftable_be_reflog_exists,
	.create_reflog = reftable_be_create_reflog,
	.delete_reflog = reftable_be_delete_reflog,
	.reflog_expire = reftable_be_reflog_expire,

	.fsck = reftable_be_fsck,
};
