Skip to content

Commit ee24c6e

Browse files
skuruppurahul2393
andauthored
feat(spanner): support setting read lock mode (#1404)
Supports setting the read lock mode in R/W transactions at both the client level and at an individual transaction level. Co-authored-by: rahul2393 <irahul@google.com>
1 parent fc93792 commit ee24c6e

File tree

9 files changed

+436
-7
lines changed

9 files changed

+436
-7
lines changed

google/cloud/spanner_v1/batch.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ def commit(
149149
max_commit_delay=None,
150150
exclude_txn_from_change_streams=False,
151151
isolation_level=TransactionOptions.IsolationLevel.ISOLATION_LEVEL_UNSPECIFIED,
152+
read_lock_mode=TransactionOptions.ReadWrite.ReadLockMode.READ_LOCK_MODE_UNSPECIFIED,
152153
timeout_secs=DEFAULT_RETRY_TIMEOUT_SECS,
153154
default_retry_delay=None,
154155
):
@@ -182,6 +183,11 @@ def commit(
182183
:param isolation_level:
183184
(Optional) Sets isolation level for the transaction.
184185
186+
:type read_lock_mode:
187+
:class:`google.cloud.spanner_v1.types.TransactionOptions.ReadWrite.ReadLockMode`
188+
:param read_lock_mode:
189+
(Optional) Sets the read lock mode for this transaction.
190+
185191
:type timeout_secs: int
186192
:param timeout_secs: (Optional) The maximum time in seconds to wait for the commit to complete.
187193
@@ -208,7 +214,9 @@ def commit(
208214
_metadata_with_leader_aware_routing(database._route_to_leader_enabled)
209215
)
210216
txn_options = TransactionOptions(
211-
read_write=TransactionOptions.ReadWrite(),
217+
read_write=TransactionOptions.ReadWrite(
218+
read_lock_mode=read_lock_mode,
219+
),
212220
exclude_txn_from_change_streams=exclude_txn_from_change_streams,
213221
isolation_level=isolation_level,
214222
)

google/cloud/spanner_v1/database.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -882,6 +882,7 @@ def batch(
882882
max_commit_delay=None,
883883
exclude_txn_from_change_streams=False,
884884
isolation_level=TransactionOptions.IsolationLevel.ISOLATION_LEVEL_UNSPECIFIED,
885+
read_lock_mode=TransactionOptions.ReadWrite.ReadLockMode.READ_LOCK_MODE_UNSPECIFIED,
885886
**kw,
886887
):
887888
"""Return an object which wraps a batch.
@@ -914,6 +915,11 @@ def batch(
914915
:param isolation_level:
915916
(Optional) Sets the isolation level for this transaction. This overrides any default isolation level set for the client.
916917
918+
:type read_lock_mode:
919+
:class:`google.cloud.spanner_v1.types.TransactionOptions.ReadWrite.ReadLockMode`
920+
:param read_lock_mode:
921+
(Optional) Sets the read lock mode for this transaction. This overrides any default read lock mode set for the client.
922+
917923
:rtype: :class:`~google.cloud.spanner_v1.database.BatchCheckout`
918924
:returns: new wrapper
919925
"""
@@ -924,6 +930,7 @@ def batch(
924930
max_commit_delay,
925931
exclude_txn_from_change_streams,
926932
isolation_level,
933+
read_lock_mode,
927934
**kw,
928935
)
929936

@@ -996,6 +1003,7 @@ def run_in_transaction(self, func, *args, **kw):
9961003
This does not exclude the transaction from being recorded in the change streams with
9971004
the DDL option `allow_txn_exclusion` being false or unset.
9981005
"isolation_level" sets the isolation level for the transaction.
1006+
"read_lock_mode" sets the read lock mode for the transaction.
9991007
10001008
:rtype: Any
10011009
:returns: The return value of ``func``.
@@ -1310,6 +1318,7 @@ def __init__(
13101318
max_commit_delay=None,
13111319
exclude_txn_from_change_streams=False,
13121320
isolation_level=TransactionOptions.IsolationLevel.ISOLATION_LEVEL_UNSPECIFIED,
1321+
read_lock_mode=TransactionOptions.ReadWrite.ReadLockMode.READ_LOCK_MODE_UNSPECIFIED,
13131322
**kw,
13141323
):
13151324
self._database: Database = database
@@ -1325,6 +1334,7 @@ def __init__(
13251334
self._max_commit_delay = max_commit_delay
13261335
self._exclude_txn_from_change_streams = exclude_txn_from_change_streams
13271336
self._isolation_level = isolation_level
1337+
self._read_lock_mode = read_lock_mode
13281338
self._kw = kw
13291339

13301340
def __enter__(self):
@@ -1357,6 +1367,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):
13571367
max_commit_delay=self._max_commit_delay,
13581368
exclude_txn_from_change_streams=self._exclude_txn_from_change_streams,
13591369
isolation_level=self._isolation_level,
1370+
read_lock_mode=self._read_lock_mode,
13601371
**self._kw,
13611372
)
13621373
finally:

google/cloud/spanner_v1/session.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -509,6 +509,7 @@ def run_in_transaction(self, func, *args, **kw):
509509
This does not exclude the transaction from being recorded in the change streams with
510510
the DDL option `allow_txn_exclusion` being false or unset.
511511
"isolation_level" sets the isolation level for the transaction.
512+
"read_lock_mode" sets the read lock mode for the transaction.
512513
513514
:rtype: Any
514515
:returns: The return value of ``func``.
@@ -525,6 +526,7 @@ def run_in_transaction(self, func, *args, **kw):
525526
"exclude_txn_from_change_streams", None
526527
)
527528
isolation_level = kw.pop("isolation_level", None)
529+
read_lock_mode = kw.pop("read_lock_mode", None)
528530

529531
database = self._database
530532
log_commit_stats = database.log_commit_stats
@@ -549,6 +551,7 @@ def run_in_transaction(self, func, *args, **kw):
549551
txn.transaction_tag = transaction_tag
550552
txn.exclude_txn_from_change_streams = exclude_txn_from_change_streams
551553
txn.isolation_level = isolation_level
554+
txn.read_lock_mode = read_lock_mode
552555

553556
if self.is_multiplexed:
554557
txn._multiplexed_session_previous_transaction_id = (

google/cloud/spanner_v1/transaction.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,9 @@ class Transaction(_SnapshotBase, _BatchBase):
6161
isolation_level: TransactionOptions.IsolationLevel = (
6262
TransactionOptions.IsolationLevel.ISOLATION_LEVEL_UNSPECIFIED
6363
)
64+
read_lock_mode: TransactionOptions.ReadWrite.ReadLockMode = (
65+
TransactionOptions.ReadWrite.ReadLockMode.READ_LOCK_MODE_UNSPECIFIED
66+
)
6467

6568
# Override defaults from _SnapshotBase.
6669
_multi_use: bool = True
@@ -89,7 +92,8 @@ def _build_transaction_options_pb(self) -> TransactionOptions:
8992

9093
merge_transaction_options = TransactionOptions(
9194
read_write=TransactionOptions.ReadWrite(
92-
multiplexed_session_previous_transaction_id=self._multiplexed_session_previous_transaction_id
95+
multiplexed_session_previous_transaction_id=self._multiplexed_session_previous_transaction_id,
96+
read_lock_mode=self.read_lock_mode,
9397
),
9498
exclude_txn_from_change_streams=self.exclude_txn_from_change_streams,
9599
isolation_level=self.isolation_level,
@@ -784,14 +788,20 @@ class BatchTransactionId:
784788
@dataclass
785789
class DefaultTransactionOptions:
786790
isolation_level: str = TransactionOptions.IsolationLevel.ISOLATION_LEVEL_UNSPECIFIED
791+
read_lock_mode: str = (
792+
TransactionOptions.ReadWrite.ReadLockMode.READ_LOCK_MODE_UNSPECIFIED
793+
)
787794
_defaultReadWriteTransactionOptions: Optional[TransactionOptions] = field(
788795
init=False, repr=False
789796
)
790797

791798
def __post_init__(self):
792799
"""Initialize _defaultReadWriteTransactionOptions automatically"""
793800
self._defaultReadWriteTransactionOptions = TransactionOptions(
794-
isolation_level=self.isolation_level
801+
read_write=TransactionOptions.ReadWrite(
802+
read_lock_mode=self.read_lock_mode,
803+
),
804+
isolation_level=self.isolation_level,
795805
)
796806

797807
@property

tests/unit/test__helpers.py

Lines changed: 69 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -978,7 +978,10 @@ def test_default_none_and_merge_none(self):
978978

979979
def test_default_options_and_merge_none(self):
980980
default = TransactionOptions(
981-
isolation_level=TransactionOptions.IsolationLevel.REPEATABLE_READ
981+
isolation_level=TransactionOptions.IsolationLevel.REPEATABLE_READ,
982+
read_write=TransactionOptions.ReadWrite(
983+
read_lock_mode=TransactionOptions.ReadWrite.ReadLockMode.PESSIMISTIC,
984+
),
982985
)
983986
merge = None
984987
result = self._callFUT(default, merge)
@@ -988,7 +991,10 @@ def test_default_options_and_merge_none(self):
988991
def test_default_none_and_merge_options(self):
989992
default = None
990993
merge = TransactionOptions(
991-
isolation_level=TransactionOptions.IsolationLevel.SERIALIZABLE
994+
isolation_level=TransactionOptions.IsolationLevel.SERIALIZABLE,
995+
read_write=TransactionOptions.ReadWrite(
996+
read_lock_mode=TransactionOptions.ReadWrite.ReadLockMode.OPTIMISTIC,
997+
),
992998
)
993999
expected = merge
9941000
result = self._callFUT(default, merge)
@@ -1044,6 +1050,67 @@ def test_default_isolation_and_merge_options_isolation_unspecified(self):
10441050
result = self._callFUT(default, merge)
10451051
self.assertEqual(result, expected)
10461052

1053+
def test_default_and_merge_read_lock_mode_options(self):
1054+
default = TransactionOptions(
1055+
read_write=TransactionOptions.ReadWrite(
1056+
read_lock_mode=TransactionOptions.ReadWrite.ReadLockMode.PESSIMISTIC,
1057+
),
1058+
)
1059+
merge = TransactionOptions(
1060+
read_write=TransactionOptions.ReadWrite(
1061+
read_lock_mode=TransactionOptions.ReadWrite.ReadLockMode.OPTIMISTIC,
1062+
),
1063+
exclude_txn_from_change_streams=True,
1064+
)
1065+
expected = TransactionOptions(
1066+
read_write=TransactionOptions.ReadWrite(
1067+
read_lock_mode=TransactionOptions.ReadWrite.ReadLockMode.OPTIMISTIC,
1068+
),
1069+
exclude_txn_from_change_streams=True,
1070+
)
1071+
result = self._callFUT(default, merge)
1072+
self.assertEqual(result, expected)
1073+
1074+
def test_default_read_lock_mode_and_merge_options(self):
1075+
default = TransactionOptions(
1076+
read_write=TransactionOptions.ReadWrite(
1077+
read_lock_mode=TransactionOptions.ReadWrite.ReadLockMode.OPTIMISTIC,
1078+
),
1079+
)
1080+
merge = TransactionOptions(
1081+
read_write=TransactionOptions.ReadWrite(),
1082+
exclude_txn_from_change_streams=True,
1083+
)
1084+
expected = TransactionOptions(
1085+
read_write=TransactionOptions.ReadWrite(
1086+
read_lock_mode=TransactionOptions.ReadWrite.ReadLockMode.OPTIMISTIC,
1087+
),
1088+
exclude_txn_from_change_streams=True,
1089+
)
1090+
result = self._callFUT(default, merge)
1091+
self.assertEqual(result, expected)
1092+
1093+
def test_default_read_lock_mode_and_merge_options_isolation_unspecified(self):
1094+
default = TransactionOptions(
1095+
read_write=TransactionOptions.ReadWrite(
1096+
read_lock_mode=TransactionOptions.ReadWrite.ReadLockMode.OPTIMISTIC,
1097+
),
1098+
)
1099+
merge = TransactionOptions(
1100+
read_write=TransactionOptions.ReadWrite(
1101+
read_lock_mode=TransactionOptions.ReadWrite.ReadLockMode.READ_LOCK_MODE_UNSPECIFIED,
1102+
),
1103+
exclude_txn_from_change_streams=True,
1104+
)
1105+
expected = TransactionOptions(
1106+
read_write=TransactionOptions.ReadWrite(
1107+
read_lock_mode=TransactionOptions.ReadWrite.ReadLockMode.OPTIMISTIC,
1108+
),
1109+
exclude_txn_from_change_streams=True,
1110+
)
1111+
result = self._callFUT(default, merge)
1112+
self.assertEqual(result, expected)
1113+
10471114

10481115
class Test_interval(unittest.TestCase):
10491116
from google.protobuf.struct_pb2 import Value

tests/unit/test_batch.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,7 @@ def _test_commit_with_options(
300300
max_commit_delay_in=None,
301301
exclude_txn_from_change_streams=False,
302302
isolation_level=TransactionOptions.IsolationLevel.ISOLATION_LEVEL_UNSPECIFIED,
303+
read_lock_mode=TransactionOptions.ReadWrite.ReadLockMode.READ_LOCK_MODE_UNSPECIFIED,
303304
):
304305
now = datetime.datetime.utcnow().replace(tzinfo=UTC)
305306
now_pb = _datetime_to_pb_timestamp(now)
@@ -315,6 +316,7 @@ def _test_commit_with_options(
315316
max_commit_delay=max_commit_delay_in,
316317
exclude_txn_from_change_streams=exclude_txn_from_change_streams,
317318
isolation_level=isolation_level,
319+
read_lock_mode=read_lock_mode,
318320
)
319321

320322
self.assertEqual(committed, now)
@@ -347,6 +349,10 @@ def _test_commit_with_options(
347349
single_use_txn.isolation_level,
348350
isolation_level,
349351
)
352+
self.assertEqual(
353+
single_use_txn.read_write.read_lock_mode,
354+
read_lock_mode,
355+
)
350356
req_id = f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1"
351357
self.assertEqual(
352358
metadata,
@@ -424,6 +430,25 @@ def test_commit_w_isolation_level(self):
424430
isolation_level=TransactionOptions.IsolationLevel.REPEATABLE_READ,
425431
)
426432

433+
def test_commit_w_read_lock_mode(self):
434+
request_options = RequestOptions(
435+
request_tag="tag-1",
436+
)
437+
self._test_commit_with_options(
438+
request_options=request_options,
439+
read_lock_mode=TransactionOptions.ReadWrite.ReadLockMode.OPTIMISTIC,
440+
)
441+
442+
def test_commit_w_isolation_level_and_read_lock_mode(self):
443+
request_options = RequestOptions(
444+
request_tag="tag-1",
445+
)
446+
self._test_commit_with_options(
447+
request_options=request_options,
448+
isolation_level=TransactionOptions.IsolationLevel.REPEATABLE_READ,
449+
read_lock_mode=TransactionOptions.ReadWrite.ReadLockMode.PESSIMISTIC,
450+
)
451+
427452
def test_context_mgr_already_committed(self):
428453
now = datetime.datetime.utcnow().replace(tzinfo=UTC)
429454
database = _Database()

tests/unit/test_client.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@ class TestClient(unittest.TestCase):
4646
},
4747
}
4848
DEFAULT_TRANSACTION_OPTIONS = DefaultTransactionOptions(
49-
isolation_level="SERIALIZABLE"
49+
isolation_level="SERIALIZABLE",
50+
read_lock_mode="PESSIMISTIC",
5051
)
5152

5253
def _get_target_class(self):

0 commit comments

Comments
 (0)