From 212518a34530a2f4784e4f6cfc65add4d4bf7f36 Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Sat, 21 Aug 2021 14:52:31 +0200 Subject: [PATCH] Implemented requests and responses of arbitrary sizes using resources. --- Examples/Filetransfer.py | 1 - Examples/Request.py | 2 +- RNS/Destination.py | 2 +- RNS/Link.py | 201 ++++++++++++++++++++++++++------------- RNS/Resource.py | 87 +++++++++++++---- RNS/__init__.py | 2 +- 6 files changed, 206 insertions(+), 89 deletions(-) diff --git a/Examples/Filetransfer.py b/Examples/Filetransfer.py index ca94046..5edff29 100644 --- a/Examples/Filetransfer.py +++ b/Examples/Filetransfer.py @@ -502,7 +502,6 @@ def download_concluded(resource): saved_filename = current_filename - if resource.status == RNS.Resource.COMPLETE: counter = 0 while os.path.isfile(saved_filename): diff --git a/Examples/Request.py b/Examples/Request.py index 228dfae..45d6089 100644 --- a/Examples/Request.py +++ b/Examples/Request.py @@ -23,7 +23,7 @@ APP_NAME = "example_utilities" # A reference to the latest client link that connected latest_client_link = None -def random_text_generator(path, data, request_id, remote_identity_hash, requested_at): +def random_text_generator(path, data, request_id, remote_identity, requested_at): RNS.log("Generating response to request "+RNS.prettyhexrep(request_id)) texts = ["They looked up", "On each full moon", "Becky was upset", "I’ll stay away from it", "The pet shop stocks everything"] return texts[random.randint(0, len(texts)-1)] diff --git a/RNS/Destination.py b/RNS/Destination.py index cdf4499..7f9be66 100755 --- a/RNS/Destination.py +++ b/RNS/Destination.py @@ -220,7 +220,7 @@ class Destination: Registers a request handler. :param path: The path for the request handler to be registered. - :param response_generator: A function or method with the signature *response_generator(path, data, request_id, remote_identity_hash, requested_at)* to be called. Whatever this funcion returns will be sent as a response to the requester. If the function returns ``None``, no response will be sent. + :param response_generator: A function or method with the signature *response_generator(path, data, request_id, remote_identity, requested_at)* to be called. Whatever this funcion returns will be sent as a response to the requester. If the function returns ``None``, no response will be sent. :param allow: One of ``RNS.Destination.ALLOW_NONE``, ``RNS.Destination.ALLOW_ALL`` or ``RNS.Destination.ALLOW_LIST``. If ``RNS.Destination.ALLOW_LIST`` is set, the request handler will only respond to requests for identified peers in the supplied list. :param allowed_list: A list of *bytes-like* :ref:`RNS.Identity` hashes. :raises: ``ValueError`` if any of the supplied arguments are invalid. diff --git a/RNS/Link.py b/RNS/Link.py index 995dc6e..f959ea7 100644 --- a/RNS/Link.py +++ b/RNS/Link.py @@ -280,10 +280,25 @@ class Link: if len(packed_request) <= Link.MDU: request_packet = RNS.Packet(self, packed_request, RNS.Packet.DATA, context = RNS.Packet.REQUEST) - return RequestReceipt(self, request_packet.send(), response_callback, failed_callback) + + return RequestReceipt( + self, + packet_receipt = request_packet.send(), + response_callback = response_callback, + failed_callback = failed_callback + ) + else: - # TODO: Implement sending requests as Resources - raise IOError("Request size of "+str(len(packed_request))+" exceeds MDU of "+str(Link.MDU)+" bytes") + request_id = RNS.Identity.truncated_hash(packed_request) + RNS.log("Sending request "+RNS.prettyhexrep(request_id)+" as resource.", RNS.LOG_DEBUG) + request_resource = RNS.Resource(packed_request, self, request_id = request_id, is_response = False) + + return RequestReceipt( + self, + resource = request_resource, + response_callback = response_callback, + failed_callback = failed_callback + ) def rtt_packet(self, packet): @@ -446,6 +461,77 @@ class Link: keepalive_packet.send() self.had_outbound() + def handle_request(self, request_id, unpacked_request): + requested_at = unpacked_request[0] + path_hash = unpacked_request[1] + request_data = unpacked_request[2] + + if path_hash in self.destination.request_handlers: + request_handler = self.destination.request_handlers[path_hash] + path = request_handler[0] + response_generator = request_handler[1] + allow = request_handler[2] + allowed_list = request_handler[3] + + allowed = False + if not allow == RNS.Destination.ALLOW_NONE: + if allow == RNS.Destination.ALLOW_LIST: + if self.__remote_identity.hash in allowed_list: + allowed = True + elif allow == RNS.Destination.ALLOW_ALL: + allowed = True + + if allowed: + RNS.log("Handling request "+RNS.prettyhexrep(request_id)+" for: "+str(path), RNS.LOG_DEBUG) + response = response_generator(path, request_data, request_id, self.__remote_identity, requested_at) + if response != None: + packed_response = umsgpack.packb([request_id, response]) + + if len(packed_response) <= Link.MDU: + RNS.Packet(self, packed_response, RNS.Packet.DATA, context = RNS.Packet.RESPONSE).send() + else: + response_resource = RNS.Resource(packed_response, self, request_id = request_id, is_response = True) + else: + identity_string = RNS.prettyhexrep(self.get_remote_identity()) if self.get_remote_identity() != None else "" + RNS.log("Request "+RNS.prettyhexrep(request_id)+" from "+identity_string+" not allowed for: "+str(path), RNS.LOG_DEBUG) + + def handle_response(self, request_id, response_data): + remove = None + for pending_request in self.pending_requests: + if pending_request.request_id == request_id: + remove = pending_request + try: + pending_request.response_received(response_data) + except Exception as e: + RNS.log("Error occurred while handling response. The contained exception was: "+str(e), RNS.LOG_ERROR) + + break + + if remove != None: + self.pending_requests.remove(remove) + + def request_resource_concluded(self, resource): + if resource.status == RNS.Resource.COMPLETE: + packed_request = resource.data.read() + unpacked_request = umsgpack.unpackb(packed_request) + request_id = RNS.Identity.truncated_hash(packed_request) + request_data = unpacked_request + + self.handle_request(request_id, request_data) + else: + RNS.log("Incoming request resource failed with status: "+RNS.hexrep([resource.status]), RNS.LOG_DEBUG) + + def response_resource_concluded(self, resource): + if resource.status == RNS.Resource.COMPLETE: + packed_response = resource.data.read() + unpacked_response = umsgpack.unpackb(packed_response) + request_id = unpacked_response[0] + response_data = unpacked_response[1] + + self.handle_response(request_id, response_data) + else: + RNS.log("Incoming response resource failed with status: "+RNS.hexrep([resource.status]), RNS.LOG_DEBUG) + def receive(self, packet): self.watchdog_lock = True if not self.status == Link.CLOSED and not (self.initiator and packet.context == RNS.Packet.KEEPALIVE and packet.data == bytes([0xFF])): @@ -493,64 +579,19 @@ class Link: request_id = packet.getTruncatedHash() packed_request = self.decrypt(packet.data) unpacked_request = umsgpack.unpackb(packed_request) - requested_at = unpacked_request[0] - path_hash = unpacked_request[1] - request_data = unpacked_request[2] - - if path_hash in self.destination.request_handlers: - request_handler = self.destination.request_handlers[path_hash] - path = request_handler[0] - response_generator = request_handler[1] - allow = request_handler[2] - allowed_list = request_handler[3] - - allowed = False - if not allow == RNS.Destination.ALLOW_NONE: - if allow == RNS.Destination.ALLOW_LIST: - if self.__remote_identity in allowed_list: - allowed = True - elif allow == RNS.Destination.ALLOW_ALL: - allowed = True - - if allowed: - response = response_generator(path, request_data, request_id, self.__remote_identity, requested_at) - if response != None: - packed_response = umsgpack.packb([request_id, True, response]) - - if len(packed_response) <= Link.MDU: - RNS.Packet(self, packed_response, RNS.Packet.DATA, context = RNS.Packet.RESPONSE).send() - else: - # TODO: Implement transfer as resource - packed_response = umsgpack.packb([request_id, False, None]) - raise Exception("Response transfer as resource not implemented") - + self.handle_request(request_id, unpacked_request) except Exception as e: RNS.log("Error occurred while handling request. The contained exception was: "+str(e), RNS.LOG_ERROR) elif packet.context == RNS.Packet.RESPONSE: - packed_response = self.decrypt(packet.data) - unpacked_response = umsgpack.unpackb(packed_response) - request_id = unpacked_response[0] - - if unpacked_response[1] == True: - remove = None - for pending_request in self.pending_requests: - if pending_request.request_id == request_id: - response_data = unpacked_response[2] - remove = pending_request - try: - pending_request.response_received(response_data) - except Exception as e: - RNS.log("Error occurred while handling response. The contained exception was: "+str(e), RNS.LOG_ERROR) - - break - - if remove != None: - self.pending_requests.remove(remove) - - else: - # TODO: Implement receiving responses as Resources - raise Exception("Response transfer as resource not implemented") + try: + packed_response = self.decrypt(packet.data) + unpacked_response = umsgpack.unpackb(packed_response) + request_id = unpacked_response[0] + response_data = unpacked_response[1] + self.handle_response(request_id, response_data) + except Exception as e: + RNS.log("Error occurred while handling response. The contained exception was: "+str(e), RNS.LOG_ERROR) elif packet.context == RNS.Packet.LRRTT: if not self.initiator: @@ -561,7 +602,12 @@ class Link: elif packet.context == RNS.Packet.RESOURCE_ADV: packet.plaintext = self.decrypt(packet.data) - if self.resource_strategy == Link.ACCEPT_NONE: + + if RNS.ResourceAdvertisement.is_request(packet): + RNS.Resource.accept(packet, callback=self.request_resource_concluded) + elif RNS.ResourceAdvertisement.is_response(packet): + RNS.Resource.accept(packet, callback=self.response_resource_concluded) + elif self.resource_strategy == Link.ACCEPT_NONE: pass elif self.resource_strategy == Link.ACCEPT_APP: if self.callbacks.resource != None: @@ -785,8 +831,18 @@ class RequestReceipt(): DELIVERED = 0x02 READY = 0x03 - def __init__(self, link, packet_receipt, response_callback = None, failed_callback = None): - self.hash = packet_receipt.truncated_hash + def __init__(self, link, packet_receipt = None, resource = None, response_callback = None, failed_callback = None): + self.packet_receipt = packet_receipt + self.resource = resource + + if self.packet_receipt != None: + self.hash = packet_receipt.truncated_hash + self.packet_receipt.set_timeout_callback(self.request_timed_out) + + elif self.resource != None: + self.hash = resource.request_id + resource.set_callback(self.request_resource_concluded) + self.link = link self.request_id = self.hash @@ -800,11 +856,19 @@ class RequestReceipt(): self.callbacks.response = response_callback self.callbacks.failed = failed_callback - self.packet_receipt = packet_receipt - self.packet_receipt.set_timeout_callback(self.request_timed_out) - self.link.pending_requests.append(self) + def request_resource_concluded(self, resource): + if resource.status == RNS.Resource.COMPLETE: + RNS.log("Request "+RNS.prettyhexrep(self.request_id)+" successfully sent as resource.", RNS.LOG_DEBUG) + else: + RNS.log("Sending request "+RNS.prettyhexrep(self.request_id)+" as resource failed with status: "+RNS.hexrep([resource.status]), RNS.LOG_DEBUG) + self.status = RequestReceipt.FAILED + self.concluded_at = time.time() + self.link.pending_requests.remove(self) + + if self.callbacks.failed != None: + self.callbacks.failed(self) def request_timed_out(self, packet_receipt): self.status = RequestReceipt.FAILED @@ -817,11 +881,12 @@ class RequestReceipt(): def response_received(self, response): self.response = response - self.packet_receipt.status = RNS.PacketReceipt.DELIVERED - self.packet_receipt.proved = True - self.packet_receipt.concluded_at = time.time() - if self.packet_receipt.callbacks.delivery != None: - self.packet_receipt.callbacks.delivery(self) + if self.packet_receipt != None: + self.packet_receipt.status = RNS.PacketReceipt.DELIVERED + self.packet_receipt.proved = True + self.packet_receipt.concluded_at = time.time() + if self.packet_receipt.callbacks.delivery != None: + self.packet_receipt.callbacks.delivery(self) if self.callbacks.response != None: self.callbacks.response(self) diff --git a/RNS/Resource.py b/RNS/Resource.py index 4dc4c22..511d496 100644 --- a/RNS/Resource.py +++ b/RNS/Resource.py @@ -21,6 +21,8 @@ class Resource: :param progress_callback: A *callable* with the signature *callback(resource)*. Will be called whenever the resource transfer progress is updated. :param segment_index: Internal use, ignore. :param original_hash: Internal use, ignore. + :param is_request: Internal use, ignore. + :param is_response: Internal use, ignore. """ WINDOW_FLEXIBILITY = 4 WINDOW_MIN = 1 @@ -119,7 +121,8 @@ class Resource: resource.link.register_incoming_resource(resource) RNS.log("Accepting resource advertisement for "+RNS.prettyhexrep(resource.hash), RNS.LOG_DEBUG) - resource.link.callbacks.resource_started(resource) + if resource.link.callbacks.resource_started != None: + resource.link.callbacks.resource_started(resource) resource.hashmap_update(0, resource.hashmap_raw) @@ -133,7 +136,7 @@ class Resource: # Create a resource for transmission to a remote destination # The data passed can be either a bytes-array or a file opened # in binary read mode. - def __init__(self, data, link, advertise=True, auto_compress=True, callback=None, progress_callback=None, segment_index = 1, original_hash = None): + def __init__(self, data, link, advertise=True, auto_compress=True, callback=None, progress_callback=None, segment_index = 1, original_hash = None, request_id = None, is_response = False): data_size = None resource_data = None if hasattr(data, "read"): @@ -188,6 +191,8 @@ class Resource: self.__watchdog_job_id = 0 self.__progress_callback = progress_callback self.rtt = None + self.request_id = request_id + self.is_response = is_response self.receiver_min_consecutive_height = 0 @@ -250,6 +255,7 @@ class Resource: self.random_hash = RNS.Identity.get_random_hash()[:Resource.RANDOM_HASH_SIZE] self.hash = RNS.Identity.full_hash(data+self.random_hash) + self.truncated_hash = RNS.Identity.truncated_hash(data+self.random_hash) self.expected_proof = RNS.Identity.full_hash(data+self.hash) if original_hash == None: @@ -739,6 +745,9 @@ class Resource: self.link.resource_concluded(self) self.callback(self) + def set_callback(self, callback): + self.callback = callback + def progress_callback(self, callback): self.__progress_callback = callback @@ -767,24 +776,63 @@ class Resource: class ResourceAdvertisement: - HASHMAP_MAX_LEN = 73 + HASHMAP_MAX_LEN = 70 COLLISION_GUARD_SIZE = 2*Resource.WINDOW_MAX+HASHMAP_MAX_LEN - def __init__(self, resource=None): + @staticmethod + def is_request(advertisement_packet): + adv = ResourceAdvertisement.unpack(advertisement_packet.plaintext) + if adv.q != None and adv.u: + return True + else: + return False + + + @staticmethod + def is_response(advertisement_packet): + adv = ResourceAdvertisement.unpack(advertisement_packet.plaintext) + + if adv.q != None and adv.p: + return True + else: + return False + + + @staticmethod + def get_request_id(advertisement_packet): + adv = ResourceAdvertisement.unpack(advertisement_packet.plaintext) + return adv.q + + + def __init__(self, resource=None, request_id=None, is_response=False): if resource != None: - self.t = resource.size # Transfer size - self.d = resource.total_size # Total uncompressed data size - self.n = len(resource.parts) # Number of parts - self.h = resource.hash # Resource hash - self.r = resource.random_hash # Resource random hash - self.o = resource.original_hash # First-segment hash - self.m = resource.hashmap # Resource hashmap - self.c = resource.compressed # Compression flag - self.e = resource.encrypted # Encryption flag - self.s = resource.split # Split flag - self.i = resource.segment_index # Segment index - self.l = resource.total_segments # Total segments - self.f = 0x00 | self.s << 2 | self.c << 1 | self.e # Flags + self.t = resource.size # Transfer size + self.d = resource.total_size # Total uncompressed data size + self.n = len(resource.parts) # Number of parts + self.h = resource.hash # Resource hash + self.r = resource.random_hash # Resource random hash + self.o = resource.original_hash # First-segment hash + self.m = resource.hashmap # Resource hashmap + self.c = resource.compressed # Compression flag + self.e = resource.encrypted # Encryption flag + self.s = resource.split # Split flag + self.i = resource.segment_index # Segment index + self.l = resource.total_segments # Total segments + self.q = resource.request_id # ID of associated request + self.u = False # Is request flag + self.p = False # Is response flag + + if self.q != None: + if not resource.is_response: + self.u = True + self.p = False + else: + self.u = False + self.p = True + + # Flags + self.f = 0x00 | self.p << 4 | self.u << 3 | self.s << 2 | self.c << 1 | self.e + def pack(self, segment=0): hashmap_start = segment*ResourceAdvertisement.HASHMAP_MAX_LEN @@ -803,12 +851,14 @@ class ResourceAdvertisement: "o": self.o, # Original hash "i": self.i, # Segment index "l": self.l, # Total segments + "q": self.q, # Request ID "f": self.f, # Resource flags "m": hashmap } return umsgpack.packb(dictionary) + @staticmethod def unpack(data): dictionary = umsgpack.unpackb(data) @@ -824,8 +874,11 @@ class ResourceAdvertisement: adv.f = dictionary["f"] adv.i = dictionary["i"] adv.l = dictionary["l"] + adv.q = dictionary["q"] adv.e = True if (adv.f & 0x01) == 0x01 else False adv.c = True if ((adv.f >> 1) & 0x01) == 0x01 else False adv.s = True if ((adv.f >> 2) & 0x01) == 0x01 else False + adv.u = True if ((adv.f >> 3) & 0x01) == 0x01 else False + adv.p = True if ((adv.f >> 4) & 0x01) == 0x01 else False return adv \ No newline at end of file diff --git a/RNS/__init__.py b/RNS/__init__.py index df74aa7..e256f1e 100755 --- a/RNS/__init__.py +++ b/RNS/__init__.py @@ -14,7 +14,7 @@ from .Transport import Transport from .Destination import Destination from .Packet import Packet from .Packet import PacketReceipt -from .Resource import Resource +from .Resource import Resource, ResourceAdvertisement modules = glob.glob(os.path.dirname(__file__)+"/*.py") __all__ = [ os.path.basename(f)[:-3] for f in modules if not f.endswith('__init__.py')]