c++ - ZMQ: socket_send/recv blocking -
so i'm trying set simple communication between zmq in python , zmq in c/c++ extension. python sets context, binds inproc socket, , passes context , socket name extension. extension sets own socket, connects, , listens messages. python sends header , string representation of dictionary on extension. pretty simple stuff using req/rep sockets. however, reason can't seem find, call socket.send blocking, , extension never gets past call zmq_recv. have test environment have same scenario play out, sockets don't block, , i've triple-checked code , should working in same way.
python:
import zmq import cppextension # no lectures using threading please. i'm restricted to, in essence # using function because of code base i'm working with. thread import start_new_thread socket = self.zmq_context.socket(zmq.req) socket_name = "inproc://agl" socket.bind(socket_name) t = start_native_thread(cppextension.actor, (self.zmq_context, socket_name)) test_send = {"foo": 1, "bar": 2} # blocks on line vvvvv socket.send("test", flags=zmq.sndmore) socket.send(str(test_send)) socket.recv() socket.send("stop")
c/c++:
// these used std::basic_string<py_unicode> reverted // normal std::string can use json parsing library. typedef string pystring; typedef char pystring_t; extern "c" pyobject * actor(pyobject *self, pyobject *args) { pyobject *py_context, *py_connect_to; pythreadstate *_save; void *context; char *connect_to; void *socket; int rc; if(!pyarg_parsetuple(args, "oo", &py_context, &py_connect_to)) { pyerr_setstring(pyexc_typeerror, "expected 2 arguments (zmq context, name of socket connect to)"); return null; } py_context = pyobject_getattrstring(py_context, "_handle"); if(py_context == null) { pyerr_setstring(pyexc_typeerror, "could not '_handle' context"); return null; } if(!pyint_check(py_context)) { pyerr_setstring(pyexc_typeerror, "_handle not integer"); return null; } context = (void*)pyint_aslong(py_context); connect_to = new char[pystring_size(py_connect_to) + 1]; strcpy(connect_to, pystring_asstring(py_connect_to)); _save = pyeval_savethread(); // // gil-less operation begin // ** warning: not call functions begin 'py', or touch // data structures begin 'py' while in section. *will* // blow python interpreter. // socket = zmq_socket(context, zmq_rep); rc = zmq_connect(socket, connect_to); pystring test("test"); pystring stop("stop"); pystring success("success"); pystring failure("failure"); if(rc == 0) { int going = 1; // should able hold full megabyte of text, should enough // message being passed in. // there way query size of incoming message...? char buffer[1000000]; while(going) { // blocks on line vvvvvv int size = zmq_recv(socket, buffer, 1000000, 0); if(size == -1) { // error continue; } // assume don't larger 1mb of data. should put // check around @ point, not right now. buffer[size] = 0; pystring fullmsg(buffer); cout << "zmq recieved: " << fullmsg << endl; if(fullmsg == test) { size = zmq_recv(socket, &buffer, 1000000, 0); if(size != -1) { buffer[size] = 0; pystring json_fullmsg(buffer); cout << "zmq json: " << json_fullmsg << endl; contacts.add(json_fullmsg); zmq_send(socket, success.c_str(), success.size() + 1, 0); } else { zmq_send(socket, failure.c_str(), failure.size() + 1, 0); } } else if(fullmsg == stop) { going = 0; zmq_send(socket, success.c_str(), success.size() + 1, 0); } } } else { // error int err = zmq_errno(); switch(err) { case einval: cout << "zmq connect err: " << "endpoint supplied invalid" << endl; break; default: cout << "zmq connect err: " << err << endl; break; } } zmq_close(socket); // // gil-less operation end // pyeval_restorethread(_save); py_incref(py_none); return py_none; }
any figuring out what's going on here appreciated.
edit: note code running in environment gevent has monkeypatched standard library. part of reason i'm using thread.start_new_thread, because saved before monkeypatching happened, , want real thread not green thread.
two things,
because using req/rep in modified version, "send,send,recv,send..." not work. both send/recv must work in 'lock-step' manner (send,recv,send,revc.)
the zmq_noblock raise exception of eagain, may mean "the socket connection not complete, please come later." try placing timer/sleep after bind , both send/recv. what's causing "resource temporarily unavailable" msg.
hope helps
mr. onoffon
Comments
Post a Comment