python - set is being modified and then seems to magically revert -
i'm trying build channel reliable delivery on top of unreliable channel (it's exercise, unreliable channel explicitly drops of packets). have acks set contains (address, sequence_number) pairs. when ack received on channel it's added acks set , condition variable notified:
msg, addr = self.unreliable_channel.recv() if isinstance(msg, ack): self.acks_cond: self.acks.add((addr, msg.ack)) print "{} got ack: {} ({})".format(self.port, self.acks, hex(id(self.acks))) self.acks_cond.notify() a different thread listening on condition variable , checking acks in thread:
with self.acks_cond: max_wait = 2 * self.unreliable_channel.delay_avg start = time.time() while not ((addr, msg.seq) in self.acks) , (time.time() - start < max_wait): print "{}: self.acks {} ({})".format(self.port, self.acks, hex(id(self.acks))) self.acks_cond.wait(0.1) print "{} waited ack of {} {}: {} ({})".format(self.port, msg.seq, addr, self.acks, hex(id(self.acks))) if (addr, msg.seq) in self.acks: print '!' * 10000 # self.acks.remove((addr, msg.seq)) return however, second snippet can't seem see modified set:
10000 got ack: set([(('192.168.1.7', 10001), 1), 'toplel']) (0x7f40a944ced0) 10000: self.acks set(['toplel']) (0x7f40a944ced0) ('toplel' string threw in set make sure wasn't somehow being emptied)
anyone have idea of messing up?
code dump below: (tried making sscce couldn't seem reproduce behavior -- i'll try more though if one).
import threading import time collections import defaultdict, namedtuple message = namedtuple('message', ['seq', 'data']) ack = namedtuple('ack', ['ack']) class reliablechannel: '''building on top of unreliablechannel, channel supports guarenteed eventual delivery, fifo delivery on per-destination basis, , no duplicated delivery.''' def __init__(self, unreliable_channel): self.unreliable_channel = unreliable_channel # set of (addr, seq) pairs we've recieved acks print "initializing toplel" self.acks = set(["toplel"]) self.acks_cond = threading.condition() self.seq = defaultdict(int) self.port = self.unreliable_channel.sock.getsockname()[1] print 'self.port: {}'.format(self.port) # leftoff: thread started can't seem modify self.acks unicast can see self.listener = threading.thread(target=self.listen) self.listener.start() def listen(self): while true: msg, addr = self.unreliable_channel.recv() if isinstance(msg, ack): self.acks_cond: self.acks.add((addr, msg.ack)) print "{} got ack: {} ({})".format(self.port, self.acks, hex(id(self.acks))) self.acks_cond.notify() else: ack = ack(msg.seq) self.unreliable_channel.unicast(ack, addr) print '{} got message {} , sent {} {}'.format(self.port, msg, ack, addr) def unicast(self, msg, addr): self.seq[addr] += 1 # sequence number message msg = message(self.seq[addr], msg) print '{} trying send message {} {}'.format(self.port, msg, addr) while true: # send message self.unreliable_channel.unicast(msg, addr) # wait ack timeout self.acks_cond: max_wait = 2 * self.unreliable_channel.delay_avg start = time.time() while not ((addr, msg.seq) in self.acks) , (time.time() - start < max_wait): print "{}: self.acks {} ({})".format(self.port, self.acks, hex(id(self.acks))) self.acks_cond.wait(0.1) print "{} waited ack of {} {}: {} ({})".format(self.port, msg.seq, addr, self.acks, hex(id(self.acks))) if (addr, msg.seq) in self.acks: print '!' * 10000 # self.acks.remove((addr, msg.seq)) return edit: after doing more messing around, seems sets "diverge" in 2 threads after while. depending on in listen, modifications list take, after that, seems each thread working own copy of set (ie, tried adding stuff set in both threads).
self-answer: knew stupid. __init__ method above launches thread, , being run in __init__ method of multiprocessing.process subclass. meaning thread running listen being launched in 1 process, while thread running unicast being run in separate one, dup-ed copy of set guess.
should've realized sooner fact ids same throwing me off -- guess figured forked process own virtual addresses, odds of being same 1 low. didn't think assume re-use same virtual address.
tl;dr: id(x) == id(y) doesn't mean x , y same object if you're messing multiprocessing.
Comments
Post a Comment