标签归档:ipc

结合使用node.js和Python

问题:结合使用node.js和Python

Node.js非常适合我们的Web项目,但是很少有需要Python的计算任务。我们已经为他们准备了Python代码。我们非常关心速度,如何以异步非阻塞方式从node.js调用Python“工作者”的最优雅方法是什么?

Node.js is a perfect match for our web project, but there are few computational tasks for which we would prefer Python. We also already have a Python code for them. We are highly concerned about speed, what is the most elegant way how to call a Python “worker” from node.js in an asynchronous non-blocking way?


回答 0

对于node.js和Python服务器之间的通信,如果两个进程都在同一服务器上运行,则我将使用Unix套接字,否则将使用TCP / IP套接字。对于封送处理协议,我将使用JSON或协议缓冲区。如果线程化Python成为瓶颈,请考虑使用Twisted Python,它提供与node.js相同的事件驱动的并发性。

如果您喜欢冒险,请学习clojureclojurescriptclojure-py),您将获得与Java,JavaScript(包括node.js),CLR和Python上的现有代码可运行并互操作的相同语言。通过使用clojure数据结构,您将获得出色的编组协议。

For communication between node.js and Python server, I would use Unix sockets if both processes run on the same server and TCP/IP sockets otherwise. For marshaling protocol I would take JSON or protocol buffer. If threaded Python shows up to be a bottleneck, consider using Twisted Python, which provides the same event driven concurrency as do node.js.

If you feel adventurous, learn clojure (clojurescript, clojure-py) and you’ll get the same language that runs and interoperates with existing code on Java, JavaScript (node.js included), CLR and Python. And you get superb marshalling protocol by simply using clojure data structures.


回答 1

这听起来像一个零MQ非常合适的场景。这是一个消息传递框架,与使用TCP或Unix套接字类似,但功能更强大(http://zguide.zeromq.org/py:all

有一个库使用zeroMQ提供了一个运行良好的RPC框架。它称为zeroRPC(http://www.zerorpc.io/)。这是你好世界。

Python“ Hello x”服务器:

import zerorpc

class HelloRPC(object):
    '''pass the method a name, it replies "Hello name!"'''
    def hello(self, name):
        return "Hello, {0}!".format(name)

def main():
    s = zerorpc.Server(HelloRPC())
    s.bind("tcp://*:4242")
    s.run()

if __name__ == "__main__" : main()

和node.js客户端:

var zerorpc = require("zerorpc");

var client = new zerorpc.Client();
client.connect("tcp://127.0.0.1:4242");
//calls the method on the python object
client.invoke("hello", "World", function(error, reply, streaming) {
    if(error){
        console.log("ERROR: ", error);
    }
    console.log(reply);
});

反之亦然,node.js服务器:

var zerorpc = require("zerorpc");

var server = new zerorpc.Server({
    hello: function(name, reply) {
        reply(null, "Hello, " + name, false);
    }
});

server.bind("tcp://0.0.0.0:4242");

和python客户端

import zerorpc, sys

c = zerorpc.Client()
c.connect("tcp://127.0.0.1:4242")
name = sys.argv[1] if len(sys.argv) > 1 else "dude"
print c.hello(name)

This sounds like a scenario where zeroMQ would be a good fit. It’s a messaging framework that’s similar to using TCP or Unix sockets, but it’s much more robust (http://zguide.zeromq.org/py:all)

There’s a library that uses zeroMQ to provide a RPC framework that works pretty well. It’s called zeroRPC (http://www.zerorpc.io/). Here’s the hello world.

Python “Hello x” server:

import zerorpc

class HelloRPC(object):
    '''pass the method a name, it replies "Hello name!"'''
    def hello(self, name):
        return "Hello, {0}!".format(name)

def main():
    s = zerorpc.Server(HelloRPC())
    s.bind("tcp://*:4242")
    s.run()

if __name__ == "__main__" : main()

And the node.js client:

var zerorpc = require("zerorpc");

var client = new zerorpc.Client();
client.connect("tcp://127.0.0.1:4242");
//calls the method on the python object
client.invoke("hello", "World", function(error, reply, streaming) {
    if(error){
        console.log("ERROR: ", error);
    }
    console.log(reply);
});

Or vice-versa, node.js server:

var zerorpc = require("zerorpc");

var server = new zerorpc.Server({
    hello: function(name, reply) {
        reply(null, "Hello, " + name, false);
    }
});

server.bind("tcp://0.0.0.0:4242");

And the python client

import zerorpc, sys

c = zerorpc.Client()
c.connect("tcp://127.0.0.1:4242")
name = sys.argv[1] if len(sys.argv) > 1 else "dude"
print c.hello(name)

回答 2

如果您安排将Python工作进程放在单独的进程中(长时间运行的服务器类型进程或按需生成的子进程),则与之进行的通信在node.js端将是异步的。UNIX / TCP套接字和stdin / out / err通信本身在节点中是异步的。

If you arrange to have your Python worker in a separate process (either long-running server-type process or a spawned child on demand), your communication with it will be asynchronous on the node.js side. UNIX/TCP sockets and stdin/out/err communication are inherently async in node.


回答 3

我也会考虑Apache Thrift http://thrift.apache.org/

它可以在几种编程语言之间架起桥梁,非常高效,并支持异步或同步调用。在此处查看完整功能http://thrift.apache.org/docs/features/

多语言可能对将来的计划很有用,例如,如果您以后想要在C ++中完成部分计算任务,则可以很容易地使用Thrift将其添加到混合中。

I’d consider also Apache Thrift http://thrift.apache.org/

It can bridge between several programming languages, is highly efficient and has support for async or sync calls. See full features here http://thrift.apache.org/docs/features/

The multi language can be useful for future plans, for example if you later want to do part of the computational task in C++ it’s very easy to do add it to the mix using Thrift.


回答 4

使用thoonk.jsthoonk.py取得了很多成功。Thoonk利用Redis(内存中的键值存储)为您提供供稿(如发布/订阅),队列和作业模式进行通信。

为什么这比unix套接字或直接tcp套接字更好?总体性能可能会有所下降,但是Thoonk提供了一个非常简单的API,该API简化了手动处理套接字的过程。Thoonk还可帮助您轻松实现一个分布式计算模型,该模型可让您扩展python worker来提高性能,因为您只是启动了python worker的新实例并将它们连接到同一Redis服务器。

I’ve had a lot of success using thoonk.js along with thoonk.py. Thoonk leverages Redis (in-memory key-value store) to give you feed (think publish/subscribe), queue and job patterns for communication.

Why is this better than unix sockets or direct tcp sockets? Overall performance may be decreased a little, however Thoonk provides a really simple API that simplifies having to manually deal with a socket. Thoonk also helps make it really trivial to implement a distributed computing model that allows you to scale your python workers to increase performance, since you just spin up new instances of your python workers and connect them to the same redis server.


回答 5

我建议使用一些工作队列,例如使用出色的Gearman,它将为您提供一种很好的方式来调度后台作业,并在处理后异步获取其结果。

Digg(在许多其他公司中)经常使用的优点是,它提供了一种强大,可扩展和强大的方法,使任何语言的工作人员都能与任何语言的客户进行交谈。

I’d recommend using some work queue using, for example, the excellent Gearman, which will provide you with a great way to dispatch background jobs, and asynchronously get their result once they’re processed.

The advantage of this, used heavily at Digg (among many others) is that it provides a strong, scalable and robust way to make workers in any language to speak with clients in any language.


回答 6

更新2019

有几种方法可以做到这一点,以下是按复杂度从高到低排列的清单

  1. Python Shell,您将流写入python控制台,它将回写给您
  2. Redis Pub Sub,当节点js发布者推送数据时,您可以使用Python监听频道
  3. Websocket连接,其中Node充当客户端,Python充当服务器,反之亦然
  4. 与Express / Flask / Tornado等的API连接分别与暴露给其他用户查询的API端点一起工作

方法1 Python Shell最简单的方法

source.js文件

const ps = require('python-shell')
// very important to add -u option since our python script runs infinitely
var options = {
    pythonPath: '/Users/zup/.local/share/virtualenvs/python_shell_test-TJN5lQez/bin/python',
    pythonOptions: ['-u'], // get print results in real-time
    // make sure you use an absolute path for scriptPath
    scriptPath: "./subscriber/",
    // args: ['value1', 'value2', 'value3'],
    mode: 'json'
};

const shell = new ps.PythonShell("destination.py", options);

function generateArray() {
    const list = []
    for (let i = 0; i < 1000; i++) {
        list.push(Math.random() * 1000)
    }
    return list
}

setInterval(() => {
    shell.send(generateArray())
}, 1000);

shell.on("message", message => {
    console.log(message);
})

destination.py文件

import datetime
import sys
import time
import numpy
import talib
import timeit
import json
import logging
logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', level=logging.INFO)

size = 1000
p = 100
o = numpy.random.random(size)
h = numpy.random.random(size)
l = numpy.random.random(size)
c = numpy.random.random(size)
v = numpy.random.random(size)

def get_indicators(values):
    # Return the RSI of the values sent from node.js
    numpy_values = numpy.array(values, dtype=numpy.double) 
    return talib.func.RSI(numpy_values, 14)

for line in sys.stdin:
    l = json.loads(line)
    print(get_indicators(l))
    # Without this step the output may not be immediately available in node
    sys.stdout.flush()

注意:创建一个名为“订户”的文件夹,该文件夹与source.js文件位于同一级别,并将destination.py放入其中。不要忘记更改您的virtualenv环境

Update 2019

There are several ways to achieve this and here is the list in increasing order of complexity

  1. Python Shell, you will write streams to the python console and it will write back to you
  2. Redis Pub Sub, you can have a channel listening in Python while your node js publisher pushes data
  3. Websocket connection where Node acts as the client and Python acts as the server or vice-versa
  4. API connection with Express/Flask/Tornado etc working separately with an API endpoint exposed for the other to query

Approach 1 Python Shell Simplest approach

source.js file

const ps = require('python-shell')
// very important to add -u option since our python script runs infinitely
var options = {
    pythonPath: '/Users/zup/.local/share/virtualenvs/python_shell_test-TJN5lQez/bin/python',
    pythonOptions: ['-u'], // get print results in real-time
    // make sure you use an absolute path for scriptPath
    scriptPath: "./subscriber/",
    // args: ['value1', 'value2', 'value3'],
    mode: 'json'
};

const shell = new ps.PythonShell("destination.py", options);

function generateArray() {
    const list = []
    for (let i = 0; i < 1000; i++) {
        list.push(Math.random() * 1000)
    }
    return list
}

setInterval(() => {
    shell.send(generateArray())
}, 1000);

shell.on("message", message => {
    console.log(message);
})

destination.py file

import datetime
import sys
import time
import numpy
import talib
import timeit
import json
import logging
logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', level=logging.INFO)

size = 1000
p = 100
o = numpy.random.random(size)
h = numpy.random.random(size)
l = numpy.random.random(size)
c = numpy.random.random(size)
v = numpy.random.random(size)

def get_indicators(values):
    # Return the RSI of the values sent from node.js
    numpy_values = numpy.array(values, dtype=numpy.double) 
    return talib.func.RSI(numpy_values, 14)

for line in sys.stdin:
    l = json.loads(line)
    print(get_indicators(l))
    # Without this step the output may not be immediately available in node
    sys.stdout.flush()

Notes: Make a folder called subscriber which is at the same level as source.js file and put destination.py inside it. Dont forget to change your virtualenv environment


在localhost上,如何选择空闲端口号?

问题:在localhost上,如何选择空闲端口号?

我正在尝试进行进程间通信,并且由于无法弄清楚如何在Windows下使用命名管道,因此我认为我将使用网络套接字。一切都发生在本地。服务器能够在单独的进程中启动从属服务器,并在某些端口上进行侦听。奴隶完成工作并将结果提交给主人。如何确定哪个端口可用?我假设我无法在端口80或21上收听?

我正在使用Python,如果这样做会减少选择的余地。

谢谢!

I’m trying to play with inter-process communication and since I could not figure out how to use named pipes under Windows I thought I’ll use network sockets. Everything happens locally. The server is able to launch slaves in a separate process and listens on some port. The slaves do their work and submit the result to the master. How do I figure out which port is available? I assume I cannot listen on port 80 or 21?

I’m using Python, if that cuts the choices down.

Thanks!


回答 0

不要绑定到特定端口,也不要绑定到端口0,例如sock.bind(('', 0))。然后,操作系统将为您选择一个可用端口。您可以使用来获得选择的端口sock.getsockname()[1],并将其传递给从站,以便它们可以重新连接。

Do not bind to a specific port, or bind to port 0, e.g. sock.bind(('', 0)). The OS will then pick an available port for you. You can get the port that was chosen using sock.getsockname()[1], and pass it on to the slaves so that they can connect back.


回答 1

为了简要说明上面的解释:

import socket
from contextlib import closing

def find_free_port():
    with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
        s.bind(('', 0))
        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        return s.getsockname()[1]

For the sake of snippet of what the guys have explained above:

import socket
from contextlib import closing

def find_free_port():
    with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
        s.bind(('', 0))
        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        return s.getsockname()[1]

回答 2

将套接字绑定到端口0。将选择一个从1024到65535的随机空闲端口。您可以在getsockname()之后紧跟着检索选定的端口bind()

Bind the socket to port 0. A random free port from 1024 to 65535 will be selected. You may retrieve the selected port with getsockname() right after bind().


回答 3

您可以在任何端口上侦听;通常,用户应用程序应侦听端口1024及更高端口(通过65535)。如果您的侦听器数量可变,那么最主要的事情就是为您的应用分配一个范围-例如20000-21000和CATCH EXCEPTIONS。这样,您将知道计算机上端口是否不可用(换句话说,由另一个进程使用)。

但是,就您而言,只要绑定失败时打印错误消息,就可以为侦听器使用单个硬编码端口,这不会有问题。

还要注意,您的大多数套接字(用于从属)不需要显式绑定到特定的端口号-仅需要将等待传入连接的套接字(例如此处的主机)设置为侦听器并绑定到端口。如果在使用套接字之前未为端口指定端口,则操作系统将为该套接字分配可用端口。当主机希望响应从机发送数据时,当侦听器接收数据时,可以访问发送者的地址。

我想您将为此使用UDP?

You can listen on whatever port you want; generally, user applications should listen to ports 1024 and above (through 65535). The main thing if you have a variable number of listeners is to allocate a range to your app – say 20000-21000, and CATCH EXCEPTIONS. That is how you will know if a port is unusable (used by another process, in other words) on your computer.

However, in your case, you shouldn’t have a problem using a single hard-coded port for your listener, as long as you print an error message if the bind fails.

Note also that most of your sockets (for the slaves) do not need to be explicitly bound to specific port numbers – only sockets that wait for incoming connections (like your master here) will need to be made a listener and bound to a port. If a port is not specified for a socket before it is used, the OS will assign a useable port to the socket. When the master wants to respond to a slave that sends it data, the address of the sender is accessible when the listener receives data.

I presume you will be using UDP for this?


回答 4

如果您只需要找到一个空闲端口供以后使用,这是一个类似于先前答案的代码段,但使用socketserver则更短:

import socketserver

with socketserver.TCPServer(("localhost", 0), None) as s:
    free_port = s.server_address[1]

请注意,不能保证端口保持空闲状态,因此您可能需要将此代码段和使用它的代码放入循环中。

If you only need to find a free port for later use, here is a snippet similar to a previous answer, but shorter, using socketserver:

import socketserver

with socketserver.TCPServer(("localhost", 0), None) as s:
    free_port = s.server_address[1]

Note that the port is not guaranteed to remain free, so you may need to put this snippet and the code using it in a loop.