Python从门到精通(七):网络-03-创建Rpc服务接口(python调用rpc的方法)

网友投稿 381 2022-08-26


Python从门到精通(七):网络-03-创建Rpc服务接口(python调用rpc的方法)

本章会用两种方式来实现,原生和grpc框架来实现。

一、基础实现

1.1、Server

from multiprocessing.connection import Listenerfrom threading import Threadfrom remote_call import RPCHandlerdef rpc_server(handler, address, authkey): sock = Listener(address, authkey=authkey) while True: client = sock.accept() t = Thread(target=handler.handle_connection, args=(client,)) t.daemon = True t.start()# Some remote functionsdef add(x, y): return x + ydef sub(x, y): return x - y# Register with a handlerhandler = RPCHandler()handler.register_function(add)handler.register_function(sub)# Run the serverrpc_server(handler, ('localhost', 17000), authkey=b'peekaboo')

1.2、Handler

class RPCHandler: def __init__(self): self._functions = { } def register_function(self, func): self._functions[func.__name__] = func def handle_connection(self, connection): try: while True: # Receive a message func_name, args, kwargs = pickle.loads(connection.recv()) # Run the RPC and send a response try: r = self._functions[func_name](*args,**kwargs) connection.send(pickle.dumps(r)) except Exception as e: connection.send(pickle.dumps(e)) except EOFError: pass

1.3、Proxy

import pickleclass RPCProxy: def __init__(self, connection): self._connection = connection def __getattr__(self, name): def do_rpc(*args, **kwargs): self._connection.send(pickle.dumps((name, args, kwargs))) result = pickle.loads(self._connection.recv()) if isinstance(result, Exception): raise result return result return do_rpc

1.4、Client

from multiprocessing.connection import Clientfrom chapter11.rpc_proxy import RPCProxyc = Client(('localhost', 17000), authkey=b'peekaboo')proxy = RPCProxy(c)print(f'add(3, 5) = {proxy.add(3, 5)}')print(f'sub(5, 12) = {proxy.sub(5, 12)}')proxy.sub([1, 2], 4)

1.5、JSON实现

1.5.1、Server

import jsonclass RPCHandler: def __init__(self): self._functions = { } def register_function(self, func): self._functions[func.__name__] = func def handle_connection(self, connection): try: while True: # Receive a message func_name, args, kwargs = json.loads(connection.recv()) # Run the RPC and send a response try: r = self._functions[func_name](*args,**kwargs) connection.send(json.dumps(r)) except Exception as e: connection.send(json.dumps(str(e))) except EOFError: pass

1.5.2、Client

import jsonclass RPCProxy: def __init__(self, connection): self._connection = connection def __getattr__(self, name): def do_rpc(*args, **kwargs): self._connection.send(json.dumps((name, args, kwargs))) result = json.loads(self._connection.recv()) return result return do_rpc

1.6、xml

from xmlrpc.server import SimpleXMLRPCServerclass KeyValueServer: _rpc_methods_ = ['get', 'set', 'delete', 'exists', 'keys'] def __init__(self, address): self._data = {} self._serv = SimpleXMLRPCServer(address, allow_none=True) for name in self._rpc_methods_: self._serv.register_function(getattr(self, name)) def get(self, name): return self._data[name] def set(self, name, value): self._data[name] = value def delete(self, name): del self._data[name] def exists(self, name): return name in self._data def keys(self): return list(self._data) def serve_forever(self): self._serv.serve_forever()if __name__ == '__main__': kvserv = KeyValueServer(('', 15000)) kvserv.serve_forever()

from xmlrpc.client import ServerProxys = ServerProxy('allow_none=True)s.set('foo','bar')s.set('spam', [1, 2, 3])s.keys()s.get('foo')s.get('spam')s.delete('spam')s.exists('spam')

二、Grpc框架

2.1、安装

sudo python3 -m pip install grpciosudo python3 -m pip install grpcio-toolspython -m grpc_tools.protoc -I../../protos --python_out=. --grpc_python_out=. ../../protos/product.proto

2.2、Server

from concurrent import futuresimport loggingimport uuidimport grpcimport timeimport product_info_pb2import product_info_pb2_grpcclass ProductInfoServicer(product_info_pb2_grpc.ProductInfoServicer): def __init__(self): self.productMap = {} def addProduct(self, request, context): id = uuid.uuid1() request.id = str(id) print("addProduct:request", request) self.productMap[str(id)] = request response = product_info_pb2.ProductID(value = str(id)) print("addProduct:response", response) return response def getProduct(self, request, context): print("getProduct:request", request) id = request.value response = self.productMap[str(id)] print("getProduct:response", response) return response# create a gRPC serverserver = grpc.server(futures.ThreadPoolExecutor(max_workers=10))# use the generated function `add_CalculatorServicer_to_server`# to add the defined class to the serverproduct_info_pb2_grpc.add_ProductInfoServicer_to_server( ProductInfoServicer(), server)# listen on port 50051print('Starting server. Listening on port 50051.')server.add_insecure_port('[::]:50051')server.start()# since server.start() will not block,# a sleep-loop is added to keep alivetry: while True: time.sleep(86400)except KeyboardInterrupt: server.stop(0)

2.3、Client

import grpcimport product_info_pb2import product_info_pb2_grpcimport time;def run(): # open a gRPC channel channel = grpc.insecure_channel('localhost:50051') # create a stub (client) stub = product_info_pb2_grpc.ProductInfoStub(channel) response = stub.addProduct(product_info_pb2.Product(name = "Apple iPhone 11", description = "Meet Apple iPhone 11. All-new dual-camera system with Ultra Wide and Night mode.", price = 699.0 )) print("add product: response", response) productInfo = stub.getProduct(product_info_pb2.ProductID(value = response.value)) print("get product: response", productInfo)run()


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Python从门到精通(七):网络-04-创建TPC和UDP服务接口(python tcp udp)
下一篇:SpringBoot数据校验功能的实现
相关文章

 发表评论

暂时没有评论,来抢沙发吧~