glclient.pairing
1from . import scheduler_pb2 as schedpb 2from google.protobuf.wrappers_pb2 import StringValue 3import glclient.glclient as native 4from glclient.glclient import Credentials 5from typing import Optional, Generator 6 7 8class NewDeviceClient(object): 9 """A Pairing Client for the "new device" flow.""" 10 11 def __init__(self, creds: Credentials, uri: Optional[str] = None): 12 self._inner = native.NewDeviceClient(creds=creds, uri=uri) 13 14 def _recv(self, m): 15 msgs = { 16 1: schedpb.PairDeviceResponse, 17 2: str, 18 } 19 20 typ, msg = m[0], m[1:] 21 conv = msgs.get(typ, None) 22 23 if conv is None: 24 raise ValueError(f"Unknown message type {typ}") 25 elif conv is str: 26 sv = StringValue() 27 sv.ParseFromString(bytes(msg)) 28 res = sv.value 29 else: 30 res = conv.FromString(bytes(msg)) 31 32 yield res 33 34 def pair_device( 35 self, name: str, description: str, restrictions: str 36 ) -> Generator[schedpb.PairDeviceResponse, None, None]: 37 """Starts a new paring session and yields for next message. 38 39 First message would be the "QRData" data that must be presented to 40 the attestation device. 41 42 Second message is either the pairing response or an error. 43 """ 44 for m in self._inner.pair_device(name, description, restrictions): 45 yield from self._recv(m) 46 47 48class AttestationDeviceClient(object): 49 def __init__(self, creds: Credentials, uri: Optional[str] = None): 50 self.inner = native.AttestationDeviceClient(creds=creds, uri=uri) 51 52 def get_pairing_data(self, device_id: str) -> schedpb.GetPairingDataResponse: 53 res = self.inner.get_pairing_data(device_id=device_id) 54 return schedpb.GetPairingDataResponse.FromString(bytes(res)) 55 56 def approve_pairing(self, device_id: str, device_name: str, restrs: str): 57 self.inner.approve_pairing(device_id, device_name, restrs) 58 59 def verify_pairing_data(self, data: schedpb.GetPairingDataRequest): 60 self.inner.verify_pairing_data(data.SerializeToString())
class
NewDeviceClient:
9class NewDeviceClient(object): 10 """A Pairing Client for the "new device" flow.""" 11 12 def __init__(self, creds: Credentials, uri: Optional[str] = None): 13 self._inner = native.NewDeviceClient(creds=creds, uri=uri) 14 15 def _recv(self, m): 16 msgs = { 17 1: schedpb.PairDeviceResponse, 18 2: str, 19 } 20 21 typ, msg = m[0], m[1:] 22 conv = msgs.get(typ, None) 23 24 if conv is None: 25 raise ValueError(f"Unknown message type {typ}") 26 elif conv is str: 27 sv = StringValue() 28 sv.ParseFromString(bytes(msg)) 29 res = sv.value 30 else: 31 res = conv.FromString(bytes(msg)) 32 33 yield res 34 35 def pair_device( 36 self, name: str, description: str, restrictions: str 37 ) -> Generator[schedpb.PairDeviceResponse, None, None]: 38 """Starts a new paring session and yields for next message. 39 40 First message would be the "QRData" data that must be presented to 41 the attestation device. 42 43 Second message is either the pairing response or an error. 44 """ 45 for m in self._inner.pair_device(name, description, restrictions): 46 yield from self._recv(m)
A Pairing Client for the "new device" flow.
def
pair_device( self, name: str, description: str, restrictions: str) -> Generator[glclient.scheduler_pb2.PairDeviceResponse, NoneType, NoneType]:
35 def pair_device( 36 self, name: str, description: str, restrictions: str 37 ) -> Generator[schedpb.PairDeviceResponse, None, None]: 38 """Starts a new paring session and yields for next message. 39 40 First message would be the "QRData" data that must be presented to 41 the attestation device. 42 43 Second message is either the pairing response or an error. 44 """ 45 for m in self._inner.pair_device(name, description, restrictions): 46 yield from self._recv(m)
Starts a new paring session and yields for next message.
First message would be the "QRData" data that must be presented to the attestation device.
Second message is either the pairing response or an error.
class
AttestationDeviceClient:
49class AttestationDeviceClient(object): 50 def __init__(self, creds: Credentials, uri: Optional[str] = None): 51 self.inner = native.AttestationDeviceClient(creds=creds, uri=uri) 52 53 def get_pairing_data(self, device_id: str) -> schedpb.GetPairingDataResponse: 54 res = self.inner.get_pairing_data(device_id=device_id) 55 return schedpb.GetPairingDataResponse.FromString(bytes(res)) 56 57 def approve_pairing(self, device_id: str, device_name: str, restrs: str): 58 self.inner.approve_pairing(device_id, device_name, restrs) 59 60 def verify_pairing_data(self, data: schedpb.GetPairingDataRequest): 61 self.inner.verify_pairing_data(data.SerializeToString())