c++ - ZMQ: socket_send/recv blocking -


so i'm trying set simple communication between zmq in python , zmq in c/c++ extension. python sets context, binds inproc socket, , passes context , socket name extension. extension sets own socket, connects, , listens messages. python sends header , string representation of dictionary on extension. pretty simple stuff using req/rep sockets. however, reason can't seem find, call socket.send blocking, , extension never gets past call zmq_recv. have test environment have same scenario play out, sockets don't block, , i've triple-checked code , should working in same way.

python:

import zmq import cppextension # no lectures using threading please. i'm restricted to, in essence # using function because of code base i'm working with. thread import start_new_thread  socket = self.zmq_context.socket(zmq.req) socket_name = "inproc://agl" socket.bind(socket_name) t = start_native_thread(cppextension.actor,                         (self.zmq_context, socket_name))  test_send = {"foo": 1, "bar": 2} # blocks on line vvvvv socket.send("test", flags=zmq.sndmore) socket.send(str(test_send)) socket.recv() socket.send("stop") 

c/c++:

// these used std::basic_string<py_unicode> reverted // normal std::string can use json parsing library. typedef string pystring; typedef char pystring_t;  extern "c" pyobject * actor(pyobject *self, pyobject *args) {     pyobject *py_context, *py_connect_to;     pythreadstate *_save;     void *context;     char *connect_to;     void *socket;     int rc;      if(!pyarg_parsetuple(args, "oo", &py_context, &py_connect_to)) {         pyerr_setstring(pyexc_typeerror, "expected 2 arguments (zmq context, name of socket connect to)");         return null;     }     py_context = pyobject_getattrstring(py_context, "_handle");     if(py_context == null) {         pyerr_setstring(pyexc_typeerror, "could not '_handle' context");         return null;     }     if(!pyint_check(py_context)) {         pyerr_setstring(pyexc_typeerror, "_handle not integer");         return null;     }     context = (void*)pyint_aslong(py_context);     connect_to = new char[pystring_size(py_connect_to) + 1];     strcpy(connect_to, pystring_asstring(py_connect_to));     _save = pyeval_savethread();      //     // gil-less operation begin     // ** warning: not call functions begin 'py', or touch     //    data structures begin 'py' while in section. *will*     //    blow python interpreter.     //     socket = zmq_socket(context, zmq_rep);     rc = zmq_connect(socket, connect_to);      pystring test("test");     pystring stop("stop");     pystring success("success");     pystring failure("failure");      if(rc == 0) {         int going = 1;         // should able hold full megabyte of text, should enough         // message being passed in.         // there way query size of incoming message...?         char buffer[1000000];         while(going) {             // blocks on line vvvvvv             int size = zmq_recv(socket, buffer, 1000000, 0);             if(size == -1) {                 // error                 continue;             }             // assume don't larger 1mb of data. should put             // check around @ point, not right now.             buffer[size] = 0;              pystring fullmsg(buffer);             cout << "zmq recieved: " << fullmsg << endl;             if(fullmsg == test) {                 size = zmq_recv(socket, &buffer, 1000000, 0);                 if(size != -1) {                     buffer[size] = 0;                     pystring json_fullmsg(buffer);                     cout << "zmq json: " << json_fullmsg << endl;                     contacts.add(json_fullmsg);                     zmq_send(socket, success.c_str(), success.size() + 1, 0);                 }                 else {                     zmq_send(socket, failure.c_str(), failure.size() + 1, 0);                 }             }             else if(fullmsg == stop) {                 going = 0;                 zmq_send(socket, success.c_str(), success.size() + 1, 0);             }         }     }     else {         // error         int err = zmq_errno();         switch(err) {         case einval:             cout << "zmq connect err: " << "endpoint supplied invalid" << endl;             break;         default:             cout << "zmq connect err: " << err << endl;             break;         }     }     zmq_close(socket);     //     // gil-less operation end     //      pyeval_restorethread(_save);     py_incref(py_none);     return py_none; } 

any figuring out what's going on here appreciated.

edit: note code running in environment gevent has monkeypatched standard library. part of reason i'm using thread.start_new_thread, because saved before monkeypatching happened, , want real thread not green thread.

two things,

because using req/rep in modified version, "send,send,recv,send..." not work. both send/recv must work in 'lock-step' manner (send,recv,send,revc.)

the zmq_noblock raise exception of eagain, may mean "the socket connection not complete, please come later." try placing timer/sleep after bind , both send/recv. what's causing "resource temporarily unavailable" msg.

hope helps

mr. onoffon


Comments

Popular posts from this blog

java - Run a .jar on Heroku -

java - Jtable duplicate Rows -

validation - How to pass paramaters like unix into windows batch file -