Optimised resource transfer timings. Improved request/response timeout handling.

pull/8/head
Mark Qvist 2021-09-03 18:53:37 +02:00
rodzic d28c888d1c
commit 42a3d23e99
3 zmienionych plików z 22 dodań i 27 usunięć

Wyświetl plik

@ -562,6 +562,9 @@ class Link:
self.handle_response(request_id, response_data)
else:
RNS.log("Incoming response resource failed with status: "+RNS.hexrep([resource.status]), RNS.LOG_DEBUG)
for pending_request in self.pending_requests:
if pending_request.request_id == resource.request_id:
pending_request.request_timed_out(None)
def receive(self, packet):
self.watchdog_lock = True
@ -640,7 +643,7 @@ class Link:
request_id = RNS.ResourceAdvertisement.get_request_id(packet)
for pending_request in self.pending_requests:
if pending_request.request_id == request_id:
RNS.Resource.accept(packet, callback=self.response_resource_concluded, progress_callback=pending_request.response_resource_progress)
RNS.Resource.accept(packet, callback=self.response_resource_concluded, progress_callback=pending_request.response_resource_progress, request_id = request_id)
pending_request.response_size = RNS.ResourceAdvertisement.get_size(packet)
pending_request.response_transfer_size = RNS.ResourceAdvertisement.get_transfer_size(packet)
pending_request.started_at = time.time()
@ -911,9 +914,6 @@ class RequestReceipt():
self.started_at = time.time()
self.status = RequestReceipt.DELIVERED
self.__resource_response_timeout = time.time()+self.timeout
load_thread = threading.Thread(target=self.__resource_response_timeout_job)
load_thread.setDaemon(True)
load_thread.start()
else:
RNS.log("Sending request "+RNS.prettyhexrep(self.request_id)+" as resource failed with status: "+RNS.hexrep([resource.status]), RNS.LOG_DEBUG)
self.status = RequestReceipt.FAILED
@ -951,14 +951,6 @@ class RequestReceipt():
else:
resource.cancel()
def __resource_response_timeout_job(self):
while self.status == RequestReceipt.DELIVERED:
now = time.time()
if now > self.__resource_response_timeout:
self.request_timed_out(None)
time.sleep(0.1)
def response_received(self, response):
if not self.status == RequestReceipt.FAILED:

Wyświetl plik

@ -46,10 +46,11 @@ class Resource:
# bz2 before sending.
AUTO_COMPRESS_MAX_SIZE = MAX_EFFICIENT_SIZE
PART_TIMEOUT_FACTOR = 3
MAX_RETRIES = 5
SENDER_GRACE_TIME = 10
RETRY_GRACE_TIME = 0.25
PART_TIMEOUT_FACTOR = 4
PART_TIMEOUT_FACTOR_AFTER_RTT = 2
MAX_RETRIES = 5
SENDER_GRACE_TIME = 10
RETRY_GRACE_TIME = 0.25
HASHMAP_IS_NOT_EXHAUSTED = 0x00
HASHMAP_IS_EXHAUSTED = 0xFF
@ -66,11 +67,11 @@ class Resource:
CORRUPT = 0x08
@staticmethod
def accept(advertisement_packet, callback=None, progress_callback = None):
def accept(advertisement_packet, callback=None, progress_callback = None, request_id = None):
try:
adv = ResourceAdvertisement.unpack(advertisement_packet.plaintext)
resource = Resource(None, advertisement_packet.link)
resource = Resource(None, advertisement_packet.link, request_id = request_id)
resource.status = Resource.TRANSFERRING
resource.flags = adv.f
@ -539,11 +540,16 @@ class Resource:
if self.req_resp == None:
self.req_resp = self.last_activity
rtt = self.req_resp-self.req_sent
if self.rtt == None:
self.rtt = rtt
self.watchdog_job()
elif self.rtt < rtt:
self.rtt = rtt
if rtt >= self.link.rtt:
self.part_timeout_factor = Resource.PART_TIMEOUT_FACTOR_AFTER_RTT
if self.rtt == None:
self.rtt = rtt
self.watchdog_job()
elif rtt < self.rtt:
self.rtt = max(self.rtt - self.rtt*0.05, rtt)
elif rtt > self.rtt:
self.rtt = min(self.rtt + self.rtt*0.05, rtt)
if not self.status == Resource.FAILED:
self.status = Resource.TRANSFERRING

Wyświetl plik

@ -525,13 +525,10 @@ class Transport:
# accordingly if we are.
if packet.transport_id != None and packet.packet_type != RNS.Packet.ANNOUNCE:
if packet.transport_id == Transport.identity.hash:
# TODO: Remove at some point
# RNS.log("Received packet in transport for "+RNS.prettyhexrep(packet.destination_hash)+" with matching transport ID, transporting it...", RNS.LOG_DEBUG)
if packet.destination_hash in Transport.destination_table:
next_hop = Transport.destination_table[packet.destination_hash][1]
remaining_hops = Transport.destination_table[packet.destination_hash][2]
# TODO: Remove at some point
# RNS.log("Next hop to destination is "+RNS.prettyhexrep(next_hop)+" with "+str(remaining_hops)+" hops remaining, transporting it.", RNS.LOG_DEBUG)
if remaining_hops > 1:
# Just increase hop count and transmit
new_raw = packet.raw[0:1]