结合使用node.js和Python

问题:结合使用node.js和Python

Node.js非常适合我们的Web项目,但是很少有需要Python的计算任务。我们已经为他们准备了Python代码。我们非常关心速度,如何以异步非阻塞方式从node.js调用Python“工作者”的最优雅方法是什么?

Node.js is a perfect match for our web project, but there are few computational tasks for which we would prefer Python. We also already have a Python code for them. We are highly concerned about speed, what is the most elegant way how to call a Python “worker” from node.js in an asynchronous non-blocking way?


回答 0

对于node.js和Python服务器之间的通信,如果两个进程都在同一服务器上运行,则我将使用Unix套接字,否则将使用TCP / IP套接字。对于封送处理协议,我将使用JSON或协议缓冲区。如果线程化Python成为瓶颈,请考虑使用Twisted Python,它提供与node.js相同的事件驱动的并发性。

如果您喜欢冒险,请学习clojureclojurescriptclojure-py),您将获得与Java,JavaScript(包括node.js),CLR和Python上的现有代码可运行并互操作的相同语言。通过使用clojure数据结构,您将获得出色的编组协议。

For communication between node.js and Python server, I would use Unix sockets if both processes run on the same server and TCP/IP sockets otherwise. For marshaling protocol I would take JSON or protocol buffer. If threaded Python shows up to be a bottleneck, consider using Twisted Python, which provides the same event driven concurrency as do node.js.

If you feel adventurous, learn clojure (clojurescript, clojure-py) and you’ll get the same language that runs and interoperates with existing code on Java, JavaScript (node.js included), CLR and Python. And you get superb marshalling protocol by simply using clojure data structures.


回答 1

这听起来像一个零MQ非常合适的场景。这是一个消息传递框架,与使用TCP或Unix套接字类似,但功能更强大(http://zguide.zeromq.org/py:all

有一个库使用zeroMQ提供了一个运行良好的RPC框架。它称为zeroRPC(http://www.zerorpc.io/)。这是你好世界。

Python“ Hello x”服务器:

import zerorpc

class HelloRPC(object):
    '''pass the method a name, it replies "Hello name!"'''
    def hello(self, name):
        return "Hello, {0}!".format(name)

def main():
    s = zerorpc.Server(HelloRPC())
    s.bind("tcp://*:4242")
    s.run()

if __name__ == "__main__" : main()

和node.js客户端:

var zerorpc = require("zerorpc");

var client = new zerorpc.Client();
client.connect("tcp://127.0.0.1:4242");
//calls the method on the python object
client.invoke("hello", "World", function(error, reply, streaming) {
    if(error){
        console.log("ERROR: ", error);
    }
    console.log(reply);
});

反之亦然,node.js服务器:

var zerorpc = require("zerorpc");

var server = new zerorpc.Server({
    hello: function(name, reply) {
        reply(null, "Hello, " + name, false);
    }
});

server.bind("tcp://0.0.0.0:4242");

和python客户端

import zerorpc, sys

c = zerorpc.Client()
c.connect("tcp://127.0.0.1:4242")
name = sys.argv[1] if len(sys.argv) > 1 else "dude"
print c.hello(name)

This sounds like a scenario where zeroMQ would be a good fit. It’s a messaging framework that’s similar to using TCP or Unix sockets, but it’s much more robust (http://zguide.zeromq.org/py:all)

There’s a library that uses zeroMQ to provide a RPC framework that works pretty well. It’s called zeroRPC (http://www.zerorpc.io/). Here’s the hello world.

Python “Hello x” server:

import zerorpc

class HelloRPC(object):
    '''pass the method a name, it replies "Hello name!"'''
    def hello(self, name):
        return "Hello, {0}!".format(name)

def main():
    s = zerorpc.Server(HelloRPC())
    s.bind("tcp://*:4242")
    s.run()

if __name__ == "__main__" : main()

And the node.js client:

var zerorpc = require("zerorpc");

var client = new zerorpc.Client();
client.connect("tcp://127.0.0.1:4242");
//calls the method on the python object
client.invoke("hello", "World", function(error, reply, streaming) {
    if(error){
        console.log("ERROR: ", error);
    }
    console.log(reply);
});

Or vice-versa, node.js server:

var zerorpc = require("zerorpc");

var server = new zerorpc.Server({
    hello: function(name, reply) {
        reply(null, "Hello, " + name, false);
    }
});

server.bind("tcp://0.0.0.0:4242");

And the python client

import zerorpc, sys

c = zerorpc.Client()
c.connect("tcp://127.0.0.1:4242")
name = sys.argv[1] if len(sys.argv) > 1 else "dude"
print c.hello(name)

回答 2

如果您安排将Python工作进程放在单独的进程中(长时间运行的服务器类型进程或按需生成的子进程),则与之进行的通信在node.js端将是异步的。UNIX / TCP套接字和stdin / out / err通信本身在节点中是异步的。

If you arrange to have your Python worker in a separate process (either long-running server-type process or a spawned child on demand), your communication with it will be asynchronous on the node.js side. UNIX/TCP sockets and stdin/out/err communication are inherently async in node.


回答 3

我也会考虑Apache Thrift http://thrift.apache.org/

它可以在几种编程语言之间架起桥梁,非常高效,并支持异步或同步调用。在此处查看完整功能http://thrift.apache.org/docs/features/

多语言可能对将来的计划很有用,例如,如果您以后想要在C ++中完成部分计算任务,则可以很容易地使用Thrift将其添加到混合中。

I’d consider also Apache Thrift http://thrift.apache.org/

It can bridge between several programming languages, is highly efficient and has support for async or sync calls. See full features here http://thrift.apache.org/docs/features/

The multi language can be useful for future plans, for example if you later want to do part of the computational task in C++ it’s very easy to do add it to the mix using Thrift.


回答 4

使用thoonk.jsthoonk.py取得了很多成功。Thoonk利用Redis(内存中的键值存储)为您提供供稿(如发布/订阅),队列和作业模式进行通信。

为什么这比unix套接字或直接tcp套接字更好?总体性能可能会有所下降,但是Thoonk提供了一个非常简单的API,该API简化了手动处理套接字的过程。Thoonk还可帮助您轻松实现一个分布式计算模型,该模型可让您扩展python worker来提高性能,因为您只是启动了python worker的新实例并将它们连接到同一Redis服务器。

I’ve had a lot of success using thoonk.js along with thoonk.py. Thoonk leverages Redis (in-memory key-value store) to give you feed (think publish/subscribe), queue and job patterns for communication.

Why is this better than unix sockets or direct tcp sockets? Overall performance may be decreased a little, however Thoonk provides a really simple API that simplifies having to manually deal with a socket. Thoonk also helps make it really trivial to implement a distributed computing model that allows you to scale your python workers to increase performance, since you just spin up new instances of your python workers and connect them to the same redis server.


回答 5

我建议使用一些工作队列,例如使用出色的Gearman,它将为您提供一种很好的方式来调度后台作业,并在处理后异步获取其结果。

Digg(在许多其他公司中)经常使用的优点是,它提供了一种强大,可扩展和强大的方法,使任何语言的工作人员都能与任何语言的客户进行交谈。

I’d recommend using some work queue using, for example, the excellent Gearman, which will provide you with a great way to dispatch background jobs, and asynchronously get their result once they’re processed.

The advantage of this, used heavily at Digg (among many others) is that it provides a strong, scalable and robust way to make workers in any language to speak with clients in any language.


回答 6

更新2019

有几种方法可以做到这一点,以下是按复杂度从高到低排列的清单

  1. Python Shell,您将流写入python控制台,它将回写给您
  2. Redis Pub Sub,当节点js发布者推送数据时,您可以使用Python监听频道
  3. Websocket连接,其中Node充当客户端,Python充当服务器,反之亦然
  4. 与Express / Flask / Tornado等的API连接分别与暴露给其他用户查询的API端点一起工作

方法1 Python Shell最简单的方法

source.js文件

const ps = require('python-shell')
// very important to add -u option since our python script runs infinitely
var options = {
    pythonPath: '/Users/zup/.local/share/virtualenvs/python_shell_test-TJN5lQez/bin/python',
    pythonOptions: ['-u'], // get print results in real-time
    // make sure you use an absolute path for scriptPath
    scriptPath: "./subscriber/",
    // args: ['value1', 'value2', 'value3'],
    mode: 'json'
};

const shell = new ps.PythonShell("destination.py", options);

function generateArray() {
    const list = []
    for (let i = 0; i < 1000; i++) {
        list.push(Math.random() * 1000)
    }
    return list
}

setInterval(() => {
    shell.send(generateArray())
}, 1000);

shell.on("message", message => {
    console.log(message);
})

destination.py文件

import datetime
import sys
import time
import numpy
import talib
import timeit
import json
import logging
logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', level=logging.INFO)

size = 1000
p = 100
o = numpy.random.random(size)
h = numpy.random.random(size)
l = numpy.random.random(size)
c = numpy.random.random(size)
v = numpy.random.random(size)

def get_indicators(values):
    # Return the RSI of the values sent from node.js
    numpy_values = numpy.array(values, dtype=numpy.double) 
    return talib.func.RSI(numpy_values, 14)

for line in sys.stdin:
    l = json.loads(line)
    print(get_indicators(l))
    # Without this step the output may not be immediately available in node
    sys.stdout.flush()

注意:创建一个名为“订户”的文件夹,该文件夹与source.js文件位于同一级别,并将destination.py放入其中。不要忘记更改您的virtualenv环境

Update 2019

There are several ways to achieve this and here is the list in increasing order of complexity

  1. Python Shell, you will write streams to the python console and it will write back to you
  2. Redis Pub Sub, you can have a channel listening in Python while your node js publisher pushes data
  3. Websocket connection where Node acts as the client and Python acts as the server or vice-versa
  4. API connection with Express/Flask/Tornado etc working separately with an API endpoint exposed for the other to query

Approach 1 Python Shell Simplest approach

source.js file

const ps = require('python-shell')
// very important to add -u option since our python script runs infinitely
var options = {
    pythonPath: '/Users/zup/.local/share/virtualenvs/python_shell_test-TJN5lQez/bin/python',
    pythonOptions: ['-u'], // get print results in real-time
    // make sure you use an absolute path for scriptPath
    scriptPath: "./subscriber/",
    // args: ['value1', 'value2', 'value3'],
    mode: 'json'
};

const shell = new ps.PythonShell("destination.py", options);

function generateArray() {
    const list = []
    for (let i = 0; i < 1000; i++) {
        list.push(Math.random() * 1000)
    }
    return list
}

setInterval(() => {
    shell.send(generateArray())
}, 1000);

shell.on("message", message => {
    console.log(message);
})

destination.py file

import datetime
import sys
import time
import numpy
import talib
import timeit
import json
import logging
logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', level=logging.INFO)

size = 1000
p = 100
o = numpy.random.random(size)
h = numpy.random.random(size)
l = numpy.random.random(size)
c = numpy.random.random(size)
v = numpy.random.random(size)

def get_indicators(values):
    # Return the RSI of the values sent from node.js
    numpy_values = numpy.array(values, dtype=numpy.double) 
    return talib.func.RSI(numpy_values, 14)

for line in sys.stdin:
    l = json.loads(line)
    print(get_indicators(l))
    # Without this step the output may not be immediately available in node
    sys.stdout.flush()

Notes: Make a folder called subscriber which is at the same level as source.js file and put destination.py inside it. Dont forget to change your virtualenv environment