-
Notifications
You must be signed in to change notification settings - Fork 22
Expand file tree
/
Copy pathtranscoders.py
More file actions
124 lines (95 loc) · 3.69 KB
/
transcoders.py
File metadata and controls
124 lines (95 loc) · 3.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
"""
Make sure to install the following for the example to work:
python3 -m pip install orjson msgpack
Must have Couchbase Python SDK v 3.2.2 or greater to use Transcoders
"""
import traceback
from typing import Any, Tuple
import orjson
import msgpack
from couchbase.cluster import Cluster
from couchbase.options import ClusterOptions, GetOptions, UpsertOptions
from couchbase.auth import PasswordAuthenticator
from couchbase.exceptions import CouchbaseException, ValueFormatException
from couchbase.transcoder import RawJSONTranscoder, RawStringTranscoder, RawBinaryTranscoder, Transcoder
cluster = Cluster.connect("couchbase://your-ip", ClusterOptions(
PasswordAuthenticator("Administrator", "password")))
bucket = cluster.bucket("travel-sample")
collection = bucket.default_collection()
# tag::raw_json_encode[]
transcoder = RawJSONTranscoder()
user = {"name": "John Smith", "age": 27}
data = orjson.dumps(user)
try:
_ = collection.upsert(
"john-smith", data, UpsertOptions(transcoder=transcoder))
except (ValueFormatException, CouchbaseException) as ex:
traceback.print_exc()
# end::raw_json_encode[]
# tag::raw_json_decode[]
try:
get_result = collection.get("john-smith", GetOptions(transcoder=transcoder))
except (ValueFormatException, CouchbaseException) as ex:
traceback.print_exc()
decoded = orjson.loads(get_result.value)
assert decoded == user
# end::raw_json_decode[]
# tag::raw_string_transcoder[]
transcoder = RawStringTranscoder()
input_str = "Hello, World!"
try:
_ = collection.upsert(
"key", input_str, UpsertOptions(transcoder=transcoder))
get_result = collection.get("key", GetOptions(transcoder=transcoder))
except (ValueFormatException, CouchbaseException) as ex:
traceback.print_exc()
assert get_result.value == input_str
# end::raw_string_transcoder[]
# tag::raw_binary_transcoder[]
transcoder = RawBinaryTranscoder()
input_bytes = bytes("Hello, World!", "utf-8")
try:
_ = collection.upsert(
"key", input_bytes, UpsertOptions(transcoder=transcoder))
get_result = collection.get("key", GetOptions(transcoder=transcoder))
except (ValueFormatException, CouchbaseException) as ex:
traceback.print_exc()
assert get_result.value == input_bytes
# end::raw_binary_transcoder[]
# tag::create_custom_transcoder[]
class MessagePackTranscoder(Transcoder):
_CUSTOM_FLAGS = (1 << 24) | (ord("M") << 16) | (ord("P") << 8) | (ord("K") << 0)
def encode_value(self, # type: "MessagePackTranscoder"
value # type: Any
) -> Tuple[bytes, int]:
try:
packed = msgpack.packb(value)
return packed, self._CUSTOM_FLAGS
except Exception as ex:
# Implement custom exception handling
print("Exception: {}".format(ex))
raise
def decode_value(self, # type: "MessagePackTranscoder"
value, # type: bytes
flags # type: int
) -> bytes:
if flags != self._CUSTOM_FLAGS:
raise ValueError("Unexpected flags value.")
try:
return msgpack.unpackb(value)
except Exception as ex:
# Implement custom exception handling
print("Exception: {}".format(ex))
raise
# end::create_custom_transcoder[]
# tag::use_custom_transcoder[]
transcoder = MessagePackTranscoder()
user = {"name": "John Smith", "age": 27}
try:
_ = collection.upsert(
"mpk_key", user, UpsertOptions(transcoder=transcoder))
get_result = collection.get("mpk_key", GetOptions(transcoder=transcoder))
except (ValueFormatException, CouchbaseException) as ex:
traceback.print_exc()
assert get_result.value == user
# end::use_custom_transcoder[]