Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -989,6 +989,10 @@ impl RelationalDB {
Ok(self.inner.alter_table_access_mut_tx(tx, name, access)?)
}

pub(crate) fn alter_table_event_flag(&self, tx: &mut MutTx, name: &str, is_event: bool) -> Result<(), DBError> {
Ok(self.inner.alter_table_event_flag_mut_tx(tx, name, is_event)?)
}

pub(crate) fn alter_table_primary_key(
&self,
tx: &mut MutTx,
Expand Down
108 changes: 108 additions & 0 deletions crates/core/src/db/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,27 @@ fn auto_migrate_database(
let table_def = plan.new.stored_in_table_def(&table_name.clone().into()).unwrap();
stdb.alter_table_access(tx, table_name, table_def.table_access.into())?;
}
spacetimedb_schema::auto_migrate::AutoMigrateStep::ChangeEventFlag(table_name) => {
let table_def: &TableDef = plan.new.expect_lookup(table_name);
let table_id = stdb
.table_id_from_name_mut(tx, table_name)?
.expect("ChangeEventFlag references a table that should exist");

// Pre-validate: flipping is only safe when the table has no committed rows.
if stdb.table_row_count_mut(tx, table_id).unwrap_or(0) > 0 {
anyhow::bail!(
"Cannot change `event` flag on table `{table_name}`: table contains data. \
Clear the table's rows (e.g. via a reducer) before toggling the `event` annotation."
);
}

Comment thread
Centril marked this conversation as resolved.
log!(
logger,
"Changing `event` flag on table `{table_name}` to `{}`",
table_def.is_event
);
stdb.alter_table_event_flag(tx, table_name, table_def.is_event)?;
}
spacetimedb_schema::auto_migrate::AutoMigrateStep::ChangePrimaryKey(table_name) => {
let table_def = plan.new.stored_in_table_def(&table_name.clone().into()).unwrap();
log!(logger, "Changing primary key for table `{table_name}`");
Expand Down Expand Up @@ -339,6 +360,8 @@ mod test {
db::relational_db::tests_utils::{begin_mut_tx, insert, TestDB},
host::module_host::create_table_from_def,
};
use pretty_assertions::assert_matches;
use spacetimedb_datastore::locking_tx_datastore::test_helpers::assert_is_event_state;
use spacetimedb_datastore::locking_tx_datastore::PendingSchemaChange;
use spacetimedb_lib::db::raw_def::v9::{btree, RawModuleDefV9Builder, TableAccess};
use spacetimedb_sats::{product, AlgebraicType, AlgebraicType::U64};
Expand Down Expand Up @@ -580,4 +603,89 @@ mod test {
);
Ok(())
}

/// Build a minimal v10 module with a single user table `events` whose
/// `is_event` flag matches `is_event`.
fn single_event_table_module_v10(is_event: bool) -> ModuleDef {
use spacetimedb_lib::db::raw_def::v10::RawModuleDefV10Builder;

let mut builder = RawModuleDefV10Builder::new();
builder
.build_table_with_new_type("events", [("id", U64)], true)
.with_event(is_event)
.finish();
builder
.finish()
.try_into()
.expect("should be a valid v10 module definition")
}

/// Create a non-event `events` table from the schema of `single_event_table_module_v10(false)`
/// in a fresh tx, commit it, and return the `TableId`. Leaves the table empty.
fn setup_events_table(stdb: &TestDB, module: &ModuleDef) -> anyhow::Result<TableId> {
let mut tx = begin_mut_tx(stdb);
for def in module.tables() {
create_table_from_def(stdb, &mut tx, module, def)?;
}
let table_id = stdb
.table_id_from_name_mut(&tx, "events")?
.expect("table should exist");
stdb.commit_tx(tx)?;
Ok(table_id)
}

#[test]
fn change_event_flag_empty_table_succeeds() -> anyhow::Result<()> {
let auth_ctx = AuthCtx::for_testing();
let stdb = TestDB::durable()?;

let old = single_event_table_module_v10(false);
let new = single_event_table_module_v10(true);
let table_id = setup_events_table(&stdb, &old)?;

let mut tx = begin_mut_tx(&stdb);
assert_is_event_state(&tx, table_id, false);

let plan = ponder_migrate(&old, &new)?;
let res = update_database(&stdb, &mut tx, auth_ctx, plan, &TestLogger)?;

assert!(
matches!(res, UpdateResult::RequiresClientDisconnect),
"flipping the `event` flag should disconnect clients"
);
assert_is_event_state(&tx, table_id, true);
assert_matches!(
tx.pending_schema_changes(),
[PendingSchemaChange::TableAlterEventFlag(t, false), ..] if *t == table_id
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use assert_eq! here instead, it's simpler.

);
Ok(())
}

#[test]
fn change_event_flag_nonempty_table_fails() -> anyhow::Result<()> {
let auth_ctx = AuthCtx::for_testing();
let stdb = TestDB::durable()?;

let old = single_event_table_module_v10(false);
let new = single_event_table_module_v10(true);
let table_id = setup_events_table(&stdb, &old)?;

// Insert a row in a separate tx so the pre-flip table state is committed.
let mut tx = begin_mut_tx(&stdb);
insert(&stdb, &mut tx, table_id, &product![42u64])?;
stdb.commit_tx(tx)?;

let mut tx = begin_mut_tx(&stdb);
let plan = ponder_migrate(&old, &new)?;
let err = update_database(&stdb, &mut tx, auth_ctx, plan, &TestLogger)
.err()
.expect("flipping `is_event` on a non-empty table should fail");
assert!(
err.to_string().contains("contains data"),
"error should mention that the table contains data, got: {err}"
);
assert_is_event_state(&tx, table_id, false);
assert_eq!(tx.pending_schema_changes(), []);
Ok(())
}
}
6 changes: 6 additions & 0 deletions crates/datastore/src/locking_tx_datastore/committed_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,12 @@ impl CommittedState {
let table = self.tables.get_mut(&table_id)?;
table.with_mut_schema(|s| s.table_access = access);
}
// A table's `is_event` flag was changed. Change back to the old one.
TableAlterEventFlag(table_id, old_is_event) => {
let table = self.tables.get_mut(&table_id)?;
Comment thread
Centril marked this conversation as resolved.
assert_eq!(table.num_rows(), 0);
table.with_mut_schema(|s| s.is_event = old_is_event);
}
// A table's primary key was changed. Change back to the old one.
TableAlterPrimaryKey(table_id, old_pk) => {
let table = self.tables.get_mut(&table_id)?;
Expand Down
107 changes: 107 additions & 0 deletions crates/datastore/src/locking_tx_datastore/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,14 @@ impl Locking {
tx.alter_table_access(table_id, access)
}

pub fn alter_table_event_flag_mut_tx(&self, tx: &mut MutTxId, name: &str, is_event: bool) -> Result<()> {
let table_id = self
.table_id_from_name_mut_tx(tx, name)?
.ok_or_else(|| TableError::NotFound(name.into()))?;

tx.alter_table_event_flag(table_id, is_event)
}

pub fn alter_table_primary_key_mut_tx(
&self,
tx: &mut MutTxId,
Expand Down Expand Up @@ -3141,6 +3149,105 @@ pub(crate) mod tests {
Ok(())
}

#[test]
fn test_alter_table_event_flag_non_event_to_event() -> ResultTest<()> {
Comment thread
Centril marked this conversation as resolved.
use crate::locking_tx_datastore::test_helpers::check_table_event_flag_altered;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be imported once for the test module instead of in each function.


let (datastore, tx, table_id) = setup_table()?;
commit(&datastore, tx)?;

let mut tx = begin_mut_tx(&datastore);
check_table_event_flag_altered(&datastore, &tx, table_id, false);

tx.alter_table_event_flag(table_id, true)?;
assert_matches!(
tx.pending_schema_changes(),
&[PendingSchemaChange::TableAlterEventFlag(t, false)] if t == table_id
);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Repeated 3x; Let's make a function out of this: check_table_event_flag_altered(&tx, table_id, state: bool)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in ead511c3f4. The new check_table_event_flag_altered(datastore, tx, table_id, expected_is_event) helper in test_helpers.rs bundles the schema is_event check with the st_event_table row-presence check. The 4 test_alter_table_event_flag_* tests each call it in lieu of the inline pattern.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By this, I meant a function that checks pending_schema_changes. This function should be named check_table_event_flag_altered. The one you made in test_helpers doesn't do what its name suggests.

check_table_event_flag_altered(&datastore, &tx, table_id, true);

let tx_data = commit(&datastore, tx)?;
// Flipping to event inserts one row into `st_event_table`
// and does not touch the user table's row data.
assert_eq!(
tx_data.inserts_for_table(ST_EVENT_TABLE_ID).map(<[_]>::len),
Some(1),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please assert the contents of the row too.

);
assert_eq!(tx_data.inserts_for_table(table_id), None);
assert_eq!(tx_data.deletes_for_table(table_id), None);

let tx = begin_mut_tx(&datastore);
check_table_event_flag_altered(&datastore, &tx, table_id, true);
Ok(())
}

#[test]
fn test_alter_table_event_flag_event_to_non_event() -> ResultTest<()> {
use crate::locking_tx_datastore::test_helpers::check_table_event_flag_altered;

let (datastore, tx, table_id) = setup_event_table()?;
commit(&datastore, tx)?;

let mut tx = begin_mut_tx(&datastore);
check_table_event_flag_altered(&datastore, &tx, table_id, true);

tx.alter_table_event_flag(table_id, false)?;
assert_matches!(
tx.pending_schema_changes(),
&[PendingSchemaChange::TableAlterEventFlag(t, true)] if t == table_id
);
check_table_event_flag_altered(&datastore, &tx, table_id, false);

let tx_data = commit(&datastore, tx)?;
// Flipping away from event deletes one row from `st_event_table`
// and does not touch the user table's row data.
assert_eq!(
tx_data.deletes_for_table(ST_EVENT_TABLE_ID).map(<[_]>::len),
Some(1),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please assert the row contents here too.

);
assert_eq!(tx_data.inserts_for_table(table_id), None);
assert_eq!(tx_data.deletes_for_table(table_id), None);

let tx = begin_mut_tx(&datastore);
check_table_event_flag_altered(&datastore, &tx, table_id, false);
Ok(())
}

#[test]
fn test_alter_table_event_flag_rollback_reverts_live_state_and_st_event_table() -> ResultTest<()> {
use crate::locking_tx_datastore::test_helpers::check_table_event_flag_altered;

let (datastore, tx, table_id) = setup_table()?;
commit(&datastore, tx)?;

let mut tx = begin_mut_tx(&datastore);
check_table_event_flag_altered(&datastore, &tx, table_id, false);

tx.alter_table_event_flag(table_id, true)?;
assert_matches!(
tx.pending_schema_changes(),
&[PendingSchemaChange::TableAlterEventFlag(t, false)] if t == table_id
);
check_table_event_flag_altered(&datastore, &tx, table_id, true);
let _ = datastore.rollback_mut_tx(tx);

let tx = begin_mut_tx(&datastore);
assert_eq!(tx.pending_schema_changes(), []);
check_table_event_flag_altered(&datastore, &tx, table_id, false);
Ok(())
}

#[test]
fn test_alter_table_event_flag_idempotent_no_pending_change() -> ResultTest<()> {
let (datastore, tx, table_id) = setup_table()?;
commit(&datastore, tx)?;

let mut tx = begin_mut_tx(&datastore);
tx.alter_table_event_flag(table_id, false)?;
assert_eq!(tx.pending_schema_changes(), []);
Ok(())
}

#[test]
fn test_alter_table_row_type_rejects_some_bad_changes() -> ResultTest<()> {
let datastore = get_datastore()?;
Expand Down
2 changes: 2 additions & 0 deletions crates/datastore/src/locking_tx_datastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ pub use tx::{NumDistinctValues, TxId};
mod tx_state;
#[cfg(any(test, feature = "test"))]
pub use tx_state::PendingSchemaChange;
#[cfg(any(test, feature = "test"))]
pub mod test_helpers;

use parking_lot::{
lock_api::{ArcMutexGuard, ArcRwLockReadGuard, ArcRwLockWriteGuard},
Expand Down
49 changes: 46 additions & 3 deletions crates/datastore/src/locking_tx_datastore/mut_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::{
error::{IndexError, SequenceError, TableError},
system_tables::{
with_sys_table_buf, StClientFields, StClientRow, StColumnAccessorFields, StColumnAccessorRow, StColumnFields,
StColumnRow, StConstraintFields, StConstraintRow, StEventTableRow, StFields as _, StIndexAccessorFields,
StColumnRow, StConstraintFields, StConstraintRow, StEventTableFields, StEventTableRow, StFields as _, StIndexAccessorFields,
StIndexAccessorRow, StIndexFields, StIndexRow, StRowLevelSecurityFields, StRowLevelSecurityRow,
StScheduledFields, StScheduledRow, StSequenceFields, StSequenceRow, StTableAccessorFields, StTableAccessorRow,
StTableFields, StTableRow, SystemTable, ST_CLIENT_ID, ST_COLUMN_ACCESSOR_ID, ST_COLUMN_ID, ST_CONSTRAINT_ID,
Expand Down Expand Up @@ -691,8 +691,7 @@ impl MutTxId {

// Insert into st_event_table if this is an event table.
if is_event {
let row = StEventTableRow { table_id };
self.insert_via_serialize_bsatn(ST_EVENT_TABLE_ID, &row)?;
self.insert_st_event_table_row(table_id)?;
}

// Create the indexes for the table.
Expand Down Expand Up @@ -1082,6 +1081,50 @@ impl MutTxId {
Ok(())
}

/// Change the `is_event` flag of the table identified by `table_id`.
///
/// Updates both the in-memory schema and the `st_event_table` system table.
/// This is a breaking change for subscribed clients (the committed state
/// semantics of the table flip), so callers must arrange a `DisconnectAllUsers`.
pub(crate) fn alter_table_event_flag(&mut self, table_id: TableId, is_event: bool) -> Result<()> {
// Write to the table in the tx state (and clone into commit state).
let ((tx_table, ..), (commit_table, ..)) = self.get_or_create_insert_table_mut(table_id)?;
let old_is_event = tx_table.get_schema().is_event;
if old_is_event == is_event {
// Idempotent no-op; do not record a pending change or it would confuse rollback.
return Ok(());
}
tx_table.with_mut_schema_and_clone(commit_table, |s| s.is_event = is_event);

// Update `st_event_table`.
if is_event {
self.insert_st_event_table_row(table_id)?;
} else {
self.delete_st_event_table_row(table_id)?;
}

// Remember the pending change so we can undo if necessary.
self.push_schema_change(PendingSchemaChange::TableAlterEventFlag(table_id, old_is_event));

Ok(())
}

/// Inserts a row into `st_event_table` marking `table_id` as an event table.
fn insert_st_event_table_row(&mut self, table_id: TableId) -> Result<()> {
let row = StEventTableRow { table_id };
self.insert_via_serialize_bsatn(ST_EVENT_TABLE_ID, &row)?;
Ok(())
}

/// Drops the row in `st_event_table` for this `table_id`.
fn delete_st_event_table_row(&mut self, table_id: TableId) -> Result<()> {
self.delete_col_eq(
ST_EVENT_TABLE_ID,
StEventTableFields::TableId.col_id(),
&table_id.into(),
)
}

/// Change the primary key of the table identified by `table_id`.
///
/// Updates both the in-memory schema and the `st_table` system table.
Expand Down
Loading