Skip to content

Commit 5434347

Browse files
committed
feat(spanner): add support for multiplexed session for r/w transactions
1 parent 0875cd8 commit 5434347

File tree

5 files changed

+808
-278
lines changed

5 files changed

+808
-278
lines changed

observability-test/database.ts

Lines changed: 28 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -913,21 +913,24 @@ describe('Database', () => {
913913
});
914914

915915
describe('getTransaction', () => {
916-
let fakePool: FakeSessionPool;
916+
let fakeSessionFactory: FakeSessionFactory;
917917
let fakeSession: FakeSession;
918918
let fakeTransaction: FakeTransaction;
919919

920920
let getSessionStub: sinon.SinonStub;
921921

922922
beforeEach(() => {
923-
fakePool = database.pool_;
923+
fakeSessionFactory = database.sessionFactory_;
924924
fakeSession = new FakeSession();
925925
fakeTransaction = new FakeTransaction(
926926
{} as google.spanner.v1.TransactionOptions.ReadWrite,
927927
);
928928

929929
getSessionStub = (
930-
sandbox.stub(fakePool, 'getSession') as sinon.SinonStub
930+
sandbox.stub(
931+
fakeSessionFactory,
932+
'getSessionForReadWrite',
933+
) as sinon.SinonStub
931934
).callsFake(callback => {
932935
callback(null, fakeSession, fakeTransaction);
933936
});
@@ -1464,23 +1467,26 @@ describe('Database', () => {
14641467
{} as google.spanner.v1.TransactionOptions.ReadWrite,
14651468
);
14661469

1467-
let pool: FakeSessionPool;
1470+
let fakeSessionFactory: FakeSessionFactory;
14681471

14691472
beforeEach(() => {
1470-
pool = database.pool_;
1473+
fakeSessionFactory = database.sessionFactory_;
14711474

1472-
(sandbox.stub(pool, 'getSession') as sinon.SinonStub).callsFake(
1473-
callback => {
1474-
callback(null, SESSION, TRANSACTION);
1475-
},
1476-
);
1475+
(
1476+
sandbox.stub(
1477+
fakeSessionFactory,
1478+
'getSessionForReadWrite',
1479+
) as sinon.SinonStub
1480+
).callsFake(callback => {
1481+
callback(null, SESSION, TRANSACTION);
1482+
});
14771483
});
14781484

14791485
it('with error getting session', done => {
14801486
const fakeErr = new Error('getting a session');
14811487

1482-
(pool.getSession as sinon.SinonStub).callsFake(callback =>
1483-
callback(fakeErr),
1488+
(fakeSessionFactory.getSessionForReadWrite as sinon.SinonStub).callsFake(
1489+
callback => callback(fakeErr),
14841490
);
14851491

14861492
database.runTransaction(
@@ -1598,15 +1604,18 @@ describe('Database', () => {
15981604
{} as google.spanner.v1.TransactionOptions.ReadWrite,
15991605
);
16001606

1601-
let pool: FakeSessionPool;
1607+
let fakeSessionFactory: FakeSessionFactory;
16021608

16031609
beforeEach(() => {
1604-
pool = database.pool_;
1605-
(sandbox.stub(pool, 'getSession') as sinon.SinonStub).callsFake(
1606-
callback => {
1607-
callback(null, SESSION, TRANSACTION);
1608-
},
1609-
);
1610+
fakeSessionFactory = database.sessionFactory_;
1611+
(
1612+
sandbox.stub(
1613+
fakeSessionFactory,
1614+
'getSessionForReadWrite',
1615+
) as sinon.SinonStub
1616+
).callsFake(callback => {
1617+
callback(null, SESSION, TRANSACTION);
1618+
});
16101619
});
16111620

16121621
it('with no error', async () => {

src/database.ts

Lines changed: 75 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -2285,26 +2285,28 @@ class Database extends common.GrpcServiceObject {
22852285
transactionTag: options.requestOptions?.transactionTag,
22862286
},
22872287
span => {
2288-
this.pool_.getSession((err, session, transaction) => {
2289-
if (!err) {
2290-
if (options.requestOptions) {
2291-
transaction!.requestOptions = Object.assign(
2292-
transaction!.requestOptions || {},
2293-
options.requestOptions,
2288+
this.sessionFactory_.getSessionForReadWrite(
2289+
(err, session, transaction) => {
2290+
if (!err) {
2291+
if (options.requestOptions) {
2292+
transaction!.requestOptions = Object.assign(
2293+
transaction!.requestOptions || {},
2294+
options.requestOptions,
2295+
);
2296+
}
2297+
transaction?.setReadWriteTransactionOptions(
2298+
options as RunTransactionOptions,
22942299
);
2300+
span.addEvent('Using Session', {'session.id': session?.id});
2301+
transaction!._observabilityOptions = this._observabilityOptions;
2302+
this._releaseOnEnd(session!, transaction!, span);
2303+
} else {
2304+
setSpanError(span, err);
22952305
}
2296-
transaction?.setReadWriteTransactionOptions(
2297-
options as RunTransactionOptions,
2298-
);
2299-
span.addEvent('Using Session', {'session.id': session?.id});
2300-
transaction!._observabilityOptions = this._observabilityOptions;
2301-
this._releaseOnEnd(session!, transaction!, span);
2302-
} else {
2303-
setSpanError(span, err);
2304-
}
2305-
span.end();
2306-
cb!(err as grpc.ServiceError | null, transaction);
2307-
});
2306+
span.end();
2307+
cb!(err as grpc.ServiceError | null, transaction);
2308+
},
2309+
);
23082310
},
23092311
);
23102312
}
@@ -3339,64 +3341,66 @@ class Database extends common.GrpcServiceObject {
33393341
transactionTag: options.requestOptions?.transactionTag,
33403342
},
33413343
span => {
3342-
this.pool_.getSession((err, session?, transaction?) => {
3343-
if (err) {
3344-
setSpanError(span, err);
3345-
}
3344+
this.sessionFactory_.getSessionForReadWrite(
3345+
(err, session?, transaction?) => {
3346+
if (err) {
3347+
setSpanError(span, err);
3348+
}
33463349

3347-
if (err && isSessionNotFoundError(err as grpc.ServiceError)) {
3348-
span.addEvent('No session available', {
3349-
'session.id': session?.id,
3350-
});
3351-
span.end();
3352-
this.runTransaction(options, runFn!);
3353-
return;
3354-
}
3350+
if (err && isSessionNotFoundError(err as grpc.ServiceError)) {
3351+
span.addEvent('No session available', {
3352+
'session.id': session?.id,
3353+
});
3354+
span.end();
3355+
this.runTransaction(options, runFn!);
3356+
return;
3357+
}
33553358

3356-
if (err) {
3357-
span.end();
3358-
runFn!(err as grpc.ServiceError);
3359-
return;
3360-
}
3359+
if (err) {
3360+
span.end();
3361+
runFn!(err as grpc.ServiceError);
3362+
return;
3363+
}
33613364

3362-
transaction!._observabilityOptions = this._observabilityOptions;
3365+
transaction!._observabilityOptions = this._observabilityOptions;
33633366

3364-
transaction!.requestOptions = Object.assign(
3365-
transaction!.requestOptions || {},
3366-
options.requestOptions,
3367-
);
3367+
transaction!.requestOptions = Object.assign(
3368+
transaction!.requestOptions || {},
3369+
options.requestOptions,
3370+
);
33683371

3369-
transaction!.setReadWriteTransactionOptions(
3370-
options as RunTransactionOptions,
3371-
);
3372+
transaction!.setReadWriteTransactionOptions(
3373+
options as RunTransactionOptions,
3374+
);
33723375

3373-
const release = () => {
3374-
this.pool_.release(session!);
3375-
span.end();
3376-
};
3376+
const release = () => {
3377+
this.sessionFactory_.release(session!);
3378+
span.end();
3379+
};
33773380

3378-
const runner = new TransactionRunner(
3379-
session!,
3380-
transaction!,
3381-
runFn!,
3382-
options,
3383-
);
3381+
const runner = new TransactionRunner(
3382+
session!,
3383+
transaction!,
3384+
runFn!,
3385+
options,
3386+
);
33843387

3385-
runner.run().then(release, err => {
3386-
setSpanError(span, err!);
3388+
runner.run().then(release, err => {
3389+
setSpanError(span, err!);
33873390

3388-
if (isSessionNotFoundError(err)) {
3389-
span.addEvent('No session available', {
3390-
'session.id': session?.id,
3391-
});
3392-
release();
3393-
this.runTransaction(options, runFn!);
3394-
} else {
3395-
setImmediate(runFn!, err);
3396-
release();
3397-
}
3398-
});
3399-
});
3391+
if (isSessionNotFoundError(err)) {
3392+
span.addEvent('No session available', {
3393+
'session.id': session?.id,
3394+
});
3395+
release();
3396+
this.runTransaction(options, runFn!);
3397+
} else {
3398+
setImmediate(runFn!, err);
3399+
release();
3400+
}
3401+
});
3402+
},
3403+
);
34003404
},
34013405
);
34023406
}
@@ -3481,7 +3485,9 @@ class Database extends common.GrpcServiceObject {
34813485
: {};
34823486

34833487
let sessionId = '';
3484-
const getSession = this.pool_.getSession.bind(this.pool_);
3488+
const getSession = this.sessionFactory_.getSessionForReadWrite.bind(
3489+
this.sessionFactory_,
3490+
);
34853491

34863492
return startTrace(
34873493
'Database.runTransactionAsync',
@@ -3519,7 +3525,7 @@ class Database extends common.GrpcServiceObject {
35193525
throw e;
35203526
} finally {
35213527
span.end();
3522-
this.pool_.release(session);
3528+
this.sessionFactory_.release(session);
35233529
}
35243530
} catch (e) {
35253531
if (isSessionNotFoundError(e as ServiceError)) {

0 commit comments

Comments
 (0)