2323import com .google .cloud .bigtable .data .v2 .models .sql .Struct ;
2424import com .google .cloud .bigtable .data .v2 .models .sql .StructReader ;
2525import com .google .common .base .Preconditions ;
26+ import com .google .protobuf .AbstractMessage ;
2627import com .google .protobuf .ByteString ;
28+ import com .google .protobuf .InvalidProtocolBufferException ;
29+ import com .google .protobuf .ProtocolMessageEnum ;
2730import java .time .Instant ;
2831import java .util .ArrayList ;
2932import java .util .Collections ;
3033import java .util .HashMap ;
3134import java .util .List ;
3235import java .util .Map ;
36+ import java .util .function .Function ;
3337
3438@ InternalApi
3539public abstract class AbstractProtoStructReader implements StructReader {
@@ -220,6 +224,12 @@ public <ElemType> List<ElemType> getList(int columnIndex, SqlType.Array<ElemType
220224 SqlType <?> actualType = getColumnType (columnIndex );
221225 checkNonNullOfType (columnIndex , arrayType , actualType , columnIndex );
222226 Value value = values ().get (columnIndex );
227+ // If the element type is proto/enum, we should use the user passed type, which contains the
228+ // schema. Otherwise, we should use the type from metadata.
229+ SqlType <?> elementType = arrayType .getElementType ();
230+ if (elementType .getCode () == SqlType .Code .PROTO || elementType .getCode () == SqlType .Code .ENUM ) {
231+ return (List <ElemType >) decodeValue (value , arrayType );
232+ }
223233 return (List <ElemType >) decodeValue (value , actualType );
224234 }
225235
@@ -231,6 +241,12 @@ public <ElemType> List<ElemType> getList(String columnName, SqlType.Array<ElemTy
231241 SqlType <?> actualType = getColumnType (columnIndex );
232242 checkNonNullOfType (columnIndex , arrayType , actualType , columnName );
233243 Value value = values ().get (columnIndex );
244+ // If the element type is proto/enum, we should use the user passed type, which contains the
245+ // schema. Otherwise, we should use the type from metadata.
246+ SqlType <?> elementType = arrayType .getElementType ();
247+ if (elementType .getCode () == SqlType .Code .PROTO || elementType .getCode () == SqlType .Code .ENUM ) {
248+ return (List <ElemType >) decodeValue (value , arrayType );
249+ }
234250 return (List <ElemType >) decodeValue (value , actualType );
235251 }
236252
@@ -241,6 +257,12 @@ public <K, V> Map<K, V> getMap(int columnIndex, SqlType.Map<K, V> mapType) {
241257 SqlType <?> actualType = getColumnType (columnIndex );
242258 checkNonNullOfType (columnIndex , mapType , actualType , columnIndex );
243259 Value value = values ().get (columnIndex );
260+ // If the value type is proto/enum, we should use the user passed type, which contains the
261+ // schema. Otherwise, we should use the type from metadata.
262+ SqlType <?> valueType = mapType .getValueType ();
263+ if (valueType .getCode () == SqlType .Code .PROTO || valueType .getCode () == SqlType .Code .ENUM ) {
264+ return (Map <K , V >) decodeValue (value , mapType );
265+ }
244266 return (Map <K , V >) decodeValue (value , actualType );
245267 }
246268
@@ -252,9 +274,61 @@ public <K, V> Map<K, V> getMap(String columnName, SqlType.Map<K, V> mapType) {
252274 SqlType <?> actualType = getColumnType (columnIndex );
253275 checkNonNullOfType (columnIndex , mapType , actualType , columnName );
254276 Value value = values ().get (columnIndex );
277+ // If the value type is proto/enum, we should use the user passed type, which contains the
278+ // schema. Otherwise, we should use the type from metadata.
279+ SqlType <?> valueType = mapType .getValueType ();
280+ if (valueType .getCode () == SqlType .Code .PROTO || valueType .getCode () == SqlType .Code .ENUM ) {
281+ return (Map <K , V >) decodeValue (value , mapType );
282+ }
255283 return (Map <K , V >) decodeValue (value , actualType );
256284 }
257285
286+ @ Override
287+ public <MsgType extends AbstractMessage > MsgType getProtoMessage (
288+ int columnIndex , MsgType message ) {
289+ // Note it is import that we use the user passed message object to decode, because the type in
290+ // the corresponding column metadata only have a message name and doesn't have schemas
291+ SqlType .Proto <MsgType > actualType = SqlType .protoOf (message );
292+ checkNonNullOfType (columnIndex , getColumnType (columnIndex ), actualType , columnIndex );
293+ Value value = values ().get (columnIndex );
294+ return (MsgType ) decodeValue (value , actualType );
295+ }
296+
297+ @ Override
298+ public <MsgType extends AbstractMessage > MsgType getProtoMessage (
299+ String columnName , MsgType message ) {
300+ int columnIndex = getColumnIndex (columnName );
301+ // Note it is import that we use the user passed message object to decode, because the type in
302+ // the corresponding column metadata only have a message name and doesn't have schemas
303+ SqlType .Proto <MsgType > actualType = SqlType .protoOf (message );
304+ checkNonNullOfType (columnIndex , getColumnType (columnIndex ), actualType , columnName );
305+ Value value = values ().get (columnIndex );
306+ return (MsgType ) decodeValue (value , actualType );
307+ }
308+
309+ @ Override
310+ public <EnumType extends ProtocolMessageEnum > EnumType getProtoEnum (
311+ int columnIndex , Function <Integer , EnumType > forNumber ) {
312+ // Note it is import that we use the user passed function to decode, because the type in
313+ // the corresponding column metadata only have an enum message name and doesn't have schemas
314+ SqlType .Enum <EnumType > actualType = SqlType .enumOf (forNumber );
315+ checkNonNullOfType (columnIndex , getColumnType (columnIndex ), actualType , columnIndex );
316+ Value value = values ().get (columnIndex );
317+ return (EnumType ) decodeValue (value , actualType );
318+ }
319+
320+ @ Override
321+ public <EnumType extends ProtocolMessageEnum > EnumType getProtoEnum (
322+ String columnName , Function <Integer , EnumType > forNumber ) {
323+ int columnIndex = getColumnIndex (columnName );
324+ // Note it is import that we use the user passed function to decode, because the type in
325+ // the corresponding column metadata only have an enum message name and doesn't have schemas
326+ SqlType .Enum <EnumType > actualType = SqlType .enumOf (forNumber );
327+ checkNonNullOfType (columnIndex , getColumnType (columnIndex ), actualType , columnName );
328+ Value value = values ().get (columnIndex );
329+ return (EnumType ) decodeValue (value , actualType );
330+ }
331+
258332 Object decodeValue (Value value , SqlType <?> type ) {
259333 if (value .getKindCase ().equals (KindCase .KIND_NOT_SET )) {
260334 return null ;
@@ -281,6 +355,15 @@ Object decodeValue(Value value, SqlType<?> type) {
281355 SqlType .Struct schema = (SqlType .Struct ) type ;
282356 // A struct value is represented as an array
283357 return ProtoStruct .create (schema , value .getArrayValue ());
358+ case PROTO :
359+ try {
360+ SqlType .Proto protoType = (SqlType .Proto ) type ;
361+ return protoType .getParserForType ().parseFrom (value .getBytesValue ());
362+ } catch (InvalidProtocolBufferException e ) {
363+ throw new IllegalStateException ("Unable to parse value to proto " + type , e );
364+ }
365+ case ENUM :
366+ return ((SqlType .Enum <?>) type ).getForNumber ().apply ((int ) value .getIntValue ());
284367 case ARRAY :
285368 ArrayList <Object > listBuilder = new ArrayList <>();
286369 SqlType .Array <?> arrayType = (SqlType .Array <?>) type ;
0 commit comments