Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
570 changes: 566 additions & 4 deletions protos/protos.d.ts

Large diffs are not rendered by default.

1,980 changes: 1,958 additions & 22 deletions protos/protos.js

Large diffs are not rendered by default.

263 changes: 239 additions & 24 deletions protos/protos.json

Large diffs are not rendered by default.

13 changes: 7 additions & 6 deletions src/multiplexed-session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -177,13 +177,18 @@ export class MultiplexedSession
*/
getSession(callback: GetSessionCallback): void {
this._acquire().then(
session => callback(null, session, session?.txn),
session =>
callback(
null,
session,
session!.transaction((session!.parent as Database).queryOptions_),
),
callback,
);
}

/**
* Acquires a session asynchronously, and prepares the transaction for the session.
* Acquires a session asynchronously.
*
* Once a session is successfully acquired, it returns the session object (which may be `null` if unsuccessful).
*
Expand All @@ -193,10 +198,6 @@ export class MultiplexedSession
*/
async _acquire(): Promise<Session | null> {
const session = await this._getSession();
// Prepare a transaction for a session
session!.txn = session!.transaction(
(session!.parent as Database).queryOptions_,
);
return session;
}

Expand Down
3 changes: 2 additions & 1 deletion src/transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,8 @@ export class Snapshot extends EventEmitter {
if (
this._latestPreCommitToken === null ||
this._latestPreCommitToken === undefined ||
this._latestPreCommitToken!.seqNum! < resp.precommitToken!.seqNum!
(resp.precommitToken &&
this._latestPreCommitToken!.seqNum! < resp.precommitToken!.seqNum!)
) {
this._latestPreCommitToken = resp.precommitToken;
}
Expand Down
52 changes: 52 additions & 0 deletions system-test/spanner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9191,6 +9191,58 @@ describe('Spanner', () => {
commitTransaction(done, PG_DATABASE, postgreSqlTable);
});

describe('parallel transactions', async () => {
async function insertAndCommitTransaction(database, sync, table, key) {
await database.runTransactionAsync(async transaction => {
// read from table TxnTable
await transaction.run('SELECT * FROM TxnTable');

// insert mutation
transaction!.insert(table.name, {
Key: key,
StringValue: 'v6',
});

// increment the shared counter
sync.count++;
if (sync.count === sync.target) {
// resolve the commit promise so that both the threads can continue to commit the transaction
sync.resolveCommitPromise();
}

// wait till the commit promise is resolved
await sync.promise;

// commit transaction once both the transactions are ready to commit
await transaction!.commit();
});
}

it('should insert and commit transaction when running parallely', async () => {
const promises: Promise<void>[] = [];
let resolvePromise;
const commitPromise = new Promise(
resolve => (resolvePromise = resolve),
);
const sync = {
target: 2, // both the transactions to be ready
count: 0, // 0 transactions are ready so far
promise: commitPromise, // the promise both the transactions wait at
resolveCommitPromise: () => resolvePromise(), // the function to resolve the commit promise
};
// run the transactions in parallel
promises.push(
insertAndCommitTransaction(DATABASE, sync, googleSqlTable, 'k1100'),
);
promises.push(
insertAndCommitTransaction(DATABASE, sync, googleSqlTable, 'k1101'),
);

// wait for both the transactions to complete their execution
await Promise.all(promises);
});
});

const rollbackTransaction = (done, database) => {
database.runTransaction((err, transaction) => {
assert.ifError(err);
Expand Down
96 changes: 72 additions & 24 deletions test/mockserver/mockspanner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ export class MockSpanner {
protobuf.Session
>();
private mutationOnly: boolean;
private transactionSeqNum: Map<string, number> = new Map<string, number>();
private transactionCounters: Map<string, number> = new Map<string, number>();
private transactions: Map<string, protobuf.Transaction> = new Map<
string,
Expand Down Expand Up @@ -679,10 +680,12 @@ export class MockSpanner {
this.pushRequest(call.request!, call.metadata);
this.simulateExecutionTime(this.executeStreamingSql.name)
.then(() => {
let transactionKey;
if (call.request!.transaction) {
const fullTransactionId = `${call.request!.session}/transactions/${
call.request!.transaction.id
}`;
transactionKey = fullTransactionId;
if (this.abortedTransactions.has(fullTransactionId)) {
call.sendMetadata(new Metadata());
call.emit(
Expand All @@ -701,29 +704,37 @@ export class MockSpanner {
call.request!.session,
call.request!.transaction.begin,
);
const precommitToken = session?.multiplexed
? protobuf.MultiplexedSessionPrecommitToken.create({
precommitToken: Buffer.from('mock-precommit-token'),
seqNum: randomInt(1, 1000),
})
: null;
if (txn instanceof Error) {
call.sendMetadata(new Metadata());
call.emit('error', txn);
call.end();
return;
}
transactionKey = `${call.request!.session}/transactions/${txn.id.toString()}`;
if (res.type === StatementResultType.RESULT_SET) {
(res.resultSet as protobuf.ResultSet).metadata!.transaction = txn;
(res.resultSet as protobuf.ResultSet).precommitToken =
precommitToken;
}
}

// get the current seqNum
const currentSeqNum = this.transactionSeqNum.get(transactionKey) || 0;
const nextSeqNum = currentSeqNum + 1;

// set the next seqNum
this.transactionSeqNum.set(transactionKey, nextSeqNum);
const precommitToken = session?.multiplexed
? protobuf.MultiplexedSessionPrecommitToken.create({
precommitToken: Buffer.from('mock-precommit-token'),
seqNum: nextSeqNum,
})
: null;
let partialResultSets;
let resumeIndex;
let streamErr;
switch (res.type) {
case StatementResultType.RESULT_SET:
(res.resultSet as protobuf.ResultSet).precommitToken =
precommitToken;
if (Array.isArray(res.resultSet)) {
partialResultSets = res.resultSet;
} else {
Expand Down Expand Up @@ -758,6 +769,7 @@ export class MockSpanner {
case StatementResultType.UPDATE_COUNT:
call.write(
MockSpanner.emptyPartialResultSet(
precommitToken,
Buffer.from('1'.padStart(8, '0')),
),
);
Expand All @@ -770,7 +782,9 @@ export class MockSpanner {
call.emit('error', streamErr);
break;
}
call.write(MockSpanner.toPartialResultSet(res.updateCount));
call.write(
MockSpanner.toPartialResultSet(precommitToken, res.updateCount),
);
break;
case StatementResultType.ERROR:
call.sendMetadata(new Metadata());
Expand Down Expand Up @@ -816,6 +830,7 @@ export class MockSpanner {
const partial = protobuf.PartialResultSet.create({
resumeToken: Buffer.from(token),
values: [],
precommitToken: resultSet.precommitToken,
});
for (
let row = i;
Expand All @@ -841,14 +856,23 @@ export class MockSpanner {
}

private static emptyPartialResultSet(
precommitToken:
| protobuf.IMultiplexedSessionPrecommitToken
| null
| undefined,
resumeToken: Uint8Array,
): protobuf.PartialResultSet {
return protobuf.PartialResultSet.create({
resumeToken,
precommitToken: precommitToken,
});
}

private static toPartialResultSet(
precommitToken:
| protobuf.IMultiplexedSessionPrecommitToken
| null
| undefined,
rowCount: number,
): protobuf.PartialResultSet {
const stats = {
Expand All @@ -857,6 +881,7 @@ export class MockSpanner {
};
return protobuf.PartialResultSet.create({
stats,
precommitToken: precommitToken,
});
}

Expand Down Expand Up @@ -992,10 +1017,12 @@ export class MockSpanner {

this.simulateExecutionTime(this.streamingRead.name)
.then(() => {
let transactionKey;
if (call.request!.transaction) {
const fullTransactionId = `${call.request!.session}/transactions/${
call.request!.transaction.id
}`;
transactionKey = fullTransactionId;
if (this.abortedTransactions.has(fullTransactionId)) {
call.sendMetadata(new Metadata());
call.emit(
Expand All @@ -1019,29 +1046,37 @@ export class MockSpanner {
call.request!.session,
call.request!.transaction.begin,
);
const precommitToken = session?.multiplexed
? protobuf.MultiplexedSessionPrecommitToken.create({
precommitToken: Buffer.from('mock-precommit-token'),
seqNum: randomInt(1, 1000),
})
: null;
if (txn instanceof Error) {
call.sendMetadata(new Metadata());
call.emit('error', txn);
call.end();
return;
}
transactionKey = `${call.request!.session}/transactions/${txn.id.toString()}`;
if (res.type === ReadRequestResultType.RESULT_SET) {
call.sendMetadata(new Metadata());
(res.resultSet as protobuf.ResultSet).metadata!.transaction = txn;
(res.resultSet as protobuf.ResultSet).precommitToken =
precommitToken;
}
}

// get the current seqNum
const currentSeqNum = this.transactionSeqNum.get(transactionKey) || 0;
const nextSeqNum = currentSeqNum + 1;

// set the next SeqNum
this.transactionSeqNum.set(transactionKey, nextSeqNum);
const precommitToken = session?.multiplexed
? protobuf.MultiplexedSessionPrecommitToken.create({
precommitToken: Buffer.from('mock-precommit-token'),
seqNum: nextSeqNum,
})
: null;
let partialResultSets;
let resumeIndex;
switch (res.type) {
case ReadRequestResultType.RESULT_SET:
(res.resultSet as protobuf.ResultSet).precommitToken =
precommitToken;
if (Array.isArray(res.resultSet)) {
partialResultSets = res.resultSet;
} else {
Expand Down Expand Up @@ -1153,6 +1188,10 @@ export class MockSpanner {
session.name + '/transactions/' + transactionId;
const transaction = this.transactions.get(fullTransactionId);
if (transaction) {
// unique transaction key
const transactionKey = `${call.request.session}/transactions/${call.request.transactionId}`;
// delete the transaction key
this.transactionSeqNum.delete(transactionKey);
this.transactions.delete(fullTransactionId);
this.transactionOptions.delete(fullTransactionId);
callback(
Expand Down Expand Up @@ -1197,6 +1236,10 @@ export class MockSpanner {
const fullTransactionId = session.name + '/transactions/' + transactionId;
const transaction = this.transactions.get(fullTransactionId);
if (transaction) {
// unique transaction key
const transactionKey = `${call.request.session}/transactions/${call.request.transactionId}`;
// delete the key
this.transactionSeqNum.delete(transactionKey);
this.transactions.delete(fullTransactionId);
this.transactionOptions.delete(fullTransactionId);
callback(null, google.protobuf.Empty.create());
Expand Down Expand Up @@ -1273,13 +1316,18 @@ export class MockSpanner {
const transactionId = id.toString().padStart(12, '0');
const fullTransactionId = session.name + '/transactions/' + transactionId;
const readTimestamp = options && options.readOnly ? now() : undefined;
const precommitToken =
this.mutationOnly && session.multiplexed && options?.readWrite
? {
precommitToken: Buffer.from('mock-precommit-token'),
seqNum: randomInt(1, 1000),
}
: null;
let precommitToken;
if (this.mutationOnly && session.multiplexed && options?.readWrite) {
// get the current seqNum
const currentSeqNum = this.transactionSeqNum.get(fullTransactionId) || 0;
const nextSeqNum = currentSeqNum + 1;
// set the next seqNum
this.transactionSeqNum.set(fullTransactionId, nextSeqNum);
precommitToken = {
precommitToken: Buffer.from('mock-precommit-token'),
seqNum: nextSeqNum,
};
}
const transaction = protobuf.Transaction.create({
id: Buffer.from(transactionId),
readTimestamp,
Expand Down
3 changes: 1 addition & 2 deletions test/multiplexed-session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -213,12 +213,11 @@ describe('MultiplexedSession', () => {

it('should pass back the session and txn', done => {
const fakeTxn = new FakeTransaction() as unknown as Transaction;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where are we mocking session.!transaction method? Ideally this test should fail since we removed the assignment right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have removed the assignment but we are returning the transaction using transaction method defined in Session class.

session!.transaction((session!.parent as Database).queryOptions_),

fakeMuxSession.txn = fakeTxn;
sandbox.stub(multiplexedSession, '_acquire').resolves(fakeMuxSession);
multiplexedSession.getSession((err, session, txn) => {
assert.ifError(err);
assert.strictEqual(session, fakeMuxSession);
assert.strictEqual(txn, fakeTxn);
assert.deepStrictEqual(txn, fakeTxn);
done();
});
});
Expand Down
Loading
Loading