Transport handling of announces and path requests for shared instance

pull/4/head
Mark Qvist 2020-05-13 20:33:10 +02:00
rodzic 6903c7a2f6
commit e0e1868e50
4 zmienionych plików z 178 dodań i 54 usunięć

Wyświetl plik

@ -33,7 +33,6 @@ class Identity:
@staticmethod
def remember(packet_hash, destination_hash, public_key, app_data = None):
RNS.log("Remembering "+RNS.prettyhexrep(destination_hash), RNS.LOG_VERBOSE)
Identity.known_destinations[destination_hash] = [time.time(), packet_hash, public_key, app_data]
@ -108,7 +107,7 @@ class Identity:
if announced_identity.pub != None and announced_identity.validate(signature, signed_data):
RNS.Identity.remember(packet.getHash(), destination_hash, public_key)
RNS.log("Stored valid announce from "+RNS.prettyhexrep(destination_hash), RNS.LOG_INFO)
RNS.log("Stored valid announce from "+RNS.prettyhexrep(destination_hash), RNS.LOG_DEBUG)
del announced_identity
return True
else:

Wyświetl plik

@ -44,6 +44,8 @@ class LocalClientInterface(Interface):
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect((self.target_ip, self.target_port))
self.is_connected_to_shared_instance = True
self.owner = owner
self.online = True
self.writing = False
@ -118,9 +120,13 @@ class LocalClientInterface(Interface):
self.online = False
self.OUT = False
self.IN = False
if self in RNS.Transport.interfaces:
RNS.Transport.interfaces.remove(self)
if self in RNS.Transport.local_client_interfaces:
RNS.Transport.local_client_interfaces.remove(self)
def __str__(self):
return "LocalInterface["+str(self.target_port)+"]"
@ -144,6 +150,8 @@ class LocalServerInterface(Interface):
return createHandler
self.owner = owner
self.is_local_shared_instance = True
address = (self.bind_ip, self.bind_port)
self.server = ThreadingTCPServer(address, handlerFactory(self.incoming_connection))
@ -162,6 +170,7 @@ class LocalServerInterface(Interface):
spawned_interface.parent_interface = self
RNS.log("Accepting new connection to shared instance: "+str(spawned_interface), RNS.LOG_VERBOSE)
RNS.Transport.interfaces.append(spawned_interface)
RNS.Transport.local_client_interfaces.append(spawned_interface)
spawned_interface.read_loop()
def processOutgoing(self, data):

Wyświetl plik

@ -159,9 +159,6 @@ class Reticulum:
self.start_local_interface()
if self.is_shared_instance or self.is_standalone_instance:
# TODO: Remove
RNS.log("Starting local interfaces...")
interface_names = []
for name in self.config["interfaces"]:
if not name in interface_names:

Wyświetl plik

@ -35,7 +35,7 @@ class Transport:
# various situations
LOCAL_REBROADCASTS_MAX = 2 # How many local rebroadcasts of an announce is allowed
PATH_REQUEST_GRACE = 0.25 # Grace time before a path announcement is made, allows directly reachable peers to respond first
PATH_REQUEST_GRACE = 0.35 # Grace time before a path announcement is made, allows directly reachable peers to respond first
PATH_REQUEST_RW = 2 # Path request random window
LINK_TIMEOUT = RNS.Link.KEEPALIVE * 2
@ -50,11 +50,17 @@ class Transport:
packet_hashlist = [] # A list of packet hashes for duplicate detection
receipts = [] # Receipts of all outgoing packets for proof processing
# Interfaces for communicating with
# local clients connected to a shared
# Reticulum instance
local_client_interfaces = []
# TODO: "destination_table" should really be renamed to "path_table"
announce_table = {} # A table for storing announces currently waiting to be retransmitted
destination_table = {} # A lookup table containing the next hop to a given destination
reverse_table = {} # A lookup table for storing packet hashes used to return proofs and replies
link_table = {} # A lookup table containing hops for links
announce_table = {} # A table for storing announces currently waiting to be retransmitted
destination_table = {} # A lookup table containing the next hop to a given destination
reverse_table = {} # A lookup table for storing packet hashes used to return proofs and replies
link_table = {} # A lookup table containing hops for links
held_announces = {} # A table containing temporarily held announce-table entries
jobs_locked = False
jobs_running = False
@ -168,35 +174,60 @@ class Transport:
Transport.receipts_last_checked = time.time()
if RNS.Reticulum.transport_enabled():
# Process announces needing retransmission
if time.time() > Transport.announces_last_checked+Transport.announces_check_interval:
for destination_hash in Transport.announce_table:
announce_entry = Transport.announce_table[destination_hash]
if announce_entry[2] > Transport.PATHFINDER_R:
RNS.log("Dropping announce for "+RNS.prettyhexrep(destination_hash)+", retries exceeded", RNS.LOG_DEBUG)
Transport.announce_table.pop(destination_hash)
break
else:
if time.time() > announce_entry[1]:
announce_entry[1] = time.time() + math.pow(Transport.PATHFINDER_C, announce_entry[4]) + Transport.PATHFINDER_T + Transport.PATHFINDER_RW
announce_entry[2] += 1
packet = announce_entry[5]
block_rebroadcasts = announce_entry[7]
announce_context = RNS.Packet.NONE
if block_rebroadcasts:
announce_context = RNS.Packet.PATH_RESPONSE
announce_data = packet.data
announce_identity = RNS.Identity.recall(packet.destination_hash)
announce_destination = RNS.Destination(announce_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "unknown", "unknown");
announce_destination.hash = packet.destination_hash
announce_destination.hexhash = announce_destination.hash.hex()
new_packet = RNS.Packet(announce_destination, announce_data, RNS.Packet.ANNOUNCE, context = announce_context, header_type = RNS.Packet.HEADER_2, transport_type = Transport.TRANSPORT, transport_id = Transport.identity.hash)
new_packet.hops = announce_entry[4]
RNS.log("Rebroadcasting announce for "+RNS.prettyhexrep(announce_destination.hash)+" with hop count "+str(new_packet.hops), RNS.LOG_DEBUG)
outgoing.append(new_packet)
# Process announces needing retransmission
if time.time() > Transport.announces_last_checked+Transport.announces_check_interval:
for destination_hash in Transport.announce_table:
announce_entry = Transport.announce_table[destination_hash]
if announce_entry[2] > Transport.PATHFINDER_R:
RNS.log("Dropping announce for "+RNS.prettyhexrep(destination_hash)+", retries exceeded", RNS.LOG_DEBUG)
Transport.announce_table.pop(destination_hash)
break
else:
if time.time() > announce_entry[1]:
announce_entry[1] = time.time() + math.pow(Transport.PATHFINDER_C, announce_entry[4]) + Transport.PATHFINDER_T + Transport.PATHFINDER_RW
announce_entry[2] += 1
packet = announce_entry[5]
block_rebroadcasts = announce_entry[7]
attached_interface = announce_entry[8]
announce_context = RNS.Packet.NONE
if block_rebroadcasts:
announce_context = RNS.Packet.PATH_RESPONSE
announce_data = packet.data
announce_identity = RNS.Identity.recall(packet.destination_hash)
announce_destination = RNS.Destination(announce_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "unknown", "unknown");
announce_destination.hash = packet.destination_hash
announce_destination.hexhash = announce_destination.hash.hex()
new_packet = RNS.Packet(
announce_destination,
announce_data,
RNS.Packet.ANNOUNCE,
context = announce_context,
header_type = RNS.Packet.HEADER_2,
transport_type = Transport.TRANSPORT,
transport_id = Transport.identity.hash,
attached_interface = attached_interface
)
Transport.announces_last_checked = time.time()
new_packet.hops = announce_entry[4]
if block_rebroadcasts:
RNS.log("Rebroadcasting announce as path response for "+RNS.prettyhexrep(announce_destination.hash)+" with hop count "+str(new_packet.hops), RNS.LOG_DEBUG)
else:
RNS.log("Rebroadcasting announce for "+RNS.prettyhexrep(announce_destination.hash)+" with hop count "+str(new_packet.hops), RNS.LOG_DEBUG)
outgoing.append(new_packet)
# This handles an edge case where a peer sends a past
# request for a destination just after an announce for
# said destination has arrived, but before it has been
# rebroadcast locally. In such a case the actual announce
# is temporarily held, and then reinserted when the path
# request has been served to the peer.
if destination_hash in Transport.held_announces:
held_entry = Transport.held_announces.pop(destination_hash)
Transport.announce_table[destination_hash] = held_entry
RNS.log("Reinserting held announce into table", RNS.LOG_DEBUG)
Transport.announces_last_checked = time.time()
# Cull the packet hashlist if it has reached max size
@ -365,6 +396,21 @@ class Transport:
RNS.log(str(interface)+" received packet with hash "+RNS.prettyhexrep(packet.packet_hash), RNS.LOG_EXTREME)
if len(Transport.local_client_interfaces) > 0:
new_raw = packet.raw[0:1]
new_raw += struct.pack("!B", packet.hops)
new_raw += packet.raw[2:]
for local_client in Transport.local_client_interfaces:
if local_client != interface:
local_client.processOutgoing(new_raw)
if Transport.is_local_client_interface(interface):
packet.hops -= 1
elif Transport.interface_to_shared_instance(interface):
packet.hops -= 1
if Transport.packet_filter(packet):
Transport.packet_hashlist.append(packet.packet_hash)
Transport.cache(packet)
@ -382,7 +428,7 @@ class Transport:
RNS.log("Next hop to destination is "+RNS.prettyhexrep(next_hop)+" with "+str(remaining_hops)+" hops remaining, transporting it.", RNS.LOG_DEBUG)
if remaining_hops > 1:
# Just increase hop count and transmit
new_raw = packet.raw[0:1]
new_raw = packet.raw[0:1]
new_raw += struct.pack("!B", packet.hops)
new_raw += next_hop
new_raw += packet.raw[12:]
@ -542,19 +588,39 @@ class Transport:
should_add = True
if should_add:
now = time.time()
retries = 0
expires = now + Transport.PATHFINDER_E
now = time.time()
retries = 0
expires = now + Transport.PATHFINDER_E
announce_hops = packet.hops
local_rebroadcasts = 0
block_rebroadcasts = False
random_blobs.append(random_blob)
attached_interface = None
retransmit_timeout = now + math.pow(Transport.PATHFINDER_C, packet.hops) + (RNS.rand() * Transport.PATHFINDER_RW)
random_blobs.append(random_blob)
if RNS.Reticulum.transport_enabled() and packet.context != RNS.Packet.PATH_RESPONSE:
Transport.announce_table[packet.destination_hash] = [now, retransmit_timeout, retries, received_from, packet.hops, packet, local_rebroadcasts, block_rebroadcasts]
if (RNS.Reticulum.transport_enabled() or Transport.from_local_client(packet)) and packet.context != RNS.Packet.PATH_RESPONSE:
# If the announce is from a local client,
# we announce it immediately, but only one
# time, and also set the hops to 0.
if Transport.from_local_client(packet):
retransmit_timeout = now
retries = Transport.PATHFINDER_R
Transport.destination_table[packet.destination_hash] = [now, received_from, packet.hops, expires, random_blobs, packet.receiving_interface, packet]
RNS.log("Path to "+RNS.prettyhexrep(packet.destination_hash)+" is now "+str(packet.hops)+" hops away via "+RNS.prettyhexrep(received_from)+" on "+str(packet.receiving_interface), RNS.LOG_DEBUG)
Transport.announce_table[packet.destination_hash] = [
now,
retransmit_timeout,
retries,
received_from,
announce_hops,
packet,
local_rebroadcasts,
block_rebroadcasts,
attached_interface
]
Transport.destination_table[packet.destination_hash] = [now, received_from, announce_hops, expires, random_blobs, packet.receiving_interface, packet]
RNS.log("Path to "+RNS.prettyhexrep(packet.destination_hash)+" is now "+str(announce_hops)+" hops away via "+RNS.prettyhexrep(received_from)+" on "+str(packet.receiving_interface), RNS.LOG_VERBOSE)
elif packet.packet_type == RNS.Packet.LINKREQUEST:
for destination in Transport.destinations:
@ -764,12 +830,23 @@ class Transport:
packet.send()
@staticmethod
def pathRequestHandler(data, packet):
if len(data) >= RNS.Identity.TRUNCATED_HASHLENGTH//8:
Transport.pathRequest(data[:RNS.Identity.TRUNCATED_HASHLENGTH//8])
def requestPathOnInterface(destination_hash, interface):
path_request_data = destination_hash + RNS.Identity.getRandomHash()
path_request_dst = RNS.Destination(None, RNS.Destination.OUT, RNS.Destination.PLAIN, Transport.APP_NAME, "path", "request")
packet = RNS.Packet(path_request_dst, path_request_data, packet_type = RNS.Packet.DATA, transport_type = RNS.Transport.BROADCAST, header_type = RNS.Packet.HEADER_1, attached_interface = interface)
packet.send()
@staticmethod
def pathRequest(destination_hash):
def pathRequestHandler(data, packet):
if len(data) >= RNS.Identity.TRUNCATED_HASHLENGTH//8:
Transport.pathRequest(
data[:RNS.Identity.TRUNCATED_HASHLENGTH//8],
Transport.from_local_client(packet),
packet.receiving_interface
)
@staticmethod
def pathRequest(destination_hash, is_from_local_client, attached_interface):
RNS.log("Path request for "+RNS.prettyhexrep(destination_hash), RNS.LOG_DEBUG)
local_destination = next((d for d in Transport.destinations if d.hash == destination_hash), None)
@ -777,7 +854,7 @@ class Transport:
RNS.log("Destination is local to this system, announcing", RNS.LOG_DEBUG)
local_destination.announce(path_response=True)
elif RNS.Reticulum.transport_enabled() and destination_hash in Transport.destination_table:
elif (RNS.Reticulum.transport_enabled() or is_from_local_client) and destination_hash in Transport.destination_table:
RNS.log("Path found, inserting announce for transmission", RNS.LOG_DEBUG)
packet = Transport.destination_table[destination_hash][6]
received_from = Transport.destination_table[destination_hash][5]
@ -786,9 +863,27 @@ class Transport:
retries = Transport.PATHFINDER_R
local_rebroadcasts = 0
block_rebroadcasts = True
announce_hops = packet.hops
retransmit_timeout = now + Transport.PATH_REQUEST_GRACE # + (RNS.rand() * Transport.PATHFINDER_RW)
Transport.announce_table[packet.destination_hash] = [now, retransmit_timeout, retries, received_from, packet.hops, packet, local_rebroadcasts, block_rebroadcasts]
# This handles an edge case where a peer sends a past
# request for a destination just after an announce for
# said destination has arrived, but before it has been
# rebroadcast locally. In such a case the actual announce
# is temporarily held, and then reinserted when the path
# request has been served to the peer.
if packet.destination_hash in Transport.announce_table:
held_entry = Transport.announce_table[packet.destination_hash]
Transport.held_announces[packet.destination_hash] = held_entry
Transport.announce_table[packet.destination_hash] = [now, retransmit_timeout, retries, received_from, announce_hops, packet, local_rebroadcasts, block_rebroadcasts, attached_interface]
elif is_from_local_client:
# Forward path request on all interfaces
# except the local client
for interface in Transport.interfaces:
if not interface == attached_interface:
Transport.requestPathOnInterface(destination_hash, interface)
else:
RNS.log("No known path to requested destination, ignoring request", RNS.LOG_DEBUG)
@ -800,6 +895,30 @@ class Transport:
# TODO: implement this
pass
@staticmethod
def from_local_client(packet):
if hasattr(packet.receiving_interface, "parent_interface"):
return Transport.is_local_client_interface(packet.receiving_interface)
else:
return False
@staticmethod
def is_local_client_interface(interface):
if hasattr(interface, "parent_interface"):
if hasattr(interface.parent_interface, "is_local_shared_instance"):
return True
else:
return False
else:
return False
@staticmethod
def interface_to_shared_instance(interface):
if hasattr(interface, "is_connected_to_shared_instance"):
return True
else:
return False
@staticmethod
def exitHandler():
RNS.log("Saving packet hashlist to storage...", RNS.LOG_VERBOSE)
@ -820,7 +939,7 @@ class Transport:
de = Transport.destination_table[destination_hash]
interface_hash = de[5].get_hash()
# Only store destination tablee entry if the associated
# Only store destination table entry if the associated
# interface is still active
interface = Transport.find_interface_from_hash(interface_hash)
if interface != None: