これは、なにをしたくて書いたもの?
ちょっとしたPythonの勉強がてらに、TCPのEcho Server/Clientを書いてみようと。
Pythonでの、ネットワークプログラミングの基礎を覚えてみる感じで。
お題
文字通り、PythonでTCPでEcho Server/Clientを書くわけですが、特にサーバー側は以下の条件を満たすように作成します。
- 受け取ったメッセージに対して「Reply: 」を付与して返す
- 同時に複数の接続を扱える(ひとつの接続を相手にしている間、他の接続の処理を一切行えない、という状況にしない)
- 停止はCtrl-c
クライアントの条件は、起動引数で受け取ったメッセージを送って、サーバーから受け取ったメッセージを標準出力に
書き出して終了、とします。
作成方法は、以下の3つ(+α)で行うようにします。
いずれも、Pythonのドキュメントを見ればだいたい答えは載っているのですが、自分で試してみるということで。
やってみた感想ですが、現時点でこういうのを作るのなら、asyncioのストリームを使うんでしょうかね?
環境
今回の環境は、こちら。
$ python3 -V Python 3.6.7
OSは、Ubuntu Linux 18.04 LTSです。
ブロッキングIO+スレッドプールを使う
まずは、1番基本っぽいやつを。ソケットを使います。
18.1. socket --- 低水準ネットワークインターフェイス — Python 3.6.8 ドキュメント
使用例に、まんまEcho Server/Clientのことが書いています。
が、それだけだと複数の接続の相手ができないので、スレッドプールを使うことにします。
スレッドプールには、concurrent.futuresモジュールのThreadPoolExecutorを使用することにしましょう。
17.4. concurrent.futures -- 並列タスク実行 — Python 3.6.8 ドキュメント
threaded_echo_server.py
import socket import threading from concurrent.futures import ThreadPoolExecutor from datetime import datetime host = 'localhost' port = 8080 bind_address = (host, port) workers = 10 backlog_size = 10 recv_size = 1024 def handle(client_socket): remote_addr = client_socket.getpeername() print('[{}] {} - handle connection, start - {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), threading.current_thread().getName(), remote_addr)) with client_socket: while True: data = client_socket.recv(recv_size) if not data: break client_socket.send(b'Reply: ' + data) print('[{}] {} - handle connection, exit - {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), threading.current_thread().getName(), remote_addr)) with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_socket: with ThreadPoolExecutor(max_workers = workers) as executor: server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server_socket.bind(bind_address) server_socket.listen(backlog_size) print('[{}] Server startup, thread-pool = {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), workers)) try: while True: client_socket, addr = server_socket.accept() executor.submit(handle, client_socket) except KeyboardInterrupt: print('[{}] Server stop'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
まず、ソケットの作成。
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_socket:
第1引数はアドレスファミリー、第2引数はソケットタイプです。他にも引数を指定可能ですが、そちらはドキュメントへ。
AF_INETアドレスファミリーでは、接続先のホスト、ポートのペアを指定してソケットにバインドします。
SOCK_STREAMというのは、TCPのことです(UDPは、SOCK_DGRAM)。
以下の処理では、ソケットに対してアドレスをバインドし、受け入れ可能なバックログを指定してリッスンを開始しています。
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind(bind_address)
server_socket.listen(backlog_size)
「socket.SO_REUSEADDR」の部分は、TIME_WAIT状態になったソケットを再利用できるようにするオプションです。
これを指定しない場合は、プログラムを起動、終了後、再度起動しようとすると以下のエラーが発生することがあり、
その回避のためになります。
OSError: [Errno 98] Address already in use
あとは、作成しておいたスレッドプールに、socket.acceptで受信したソケットと、処理を行う関数を渡して、別スレッドでの
処理に移します。
try: while True: client_socket, addr = server_socket.accept() executor.submit(handle, client_socket) except KeyboardInterrupt: print('[{}] Server stop'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
別スレッドから呼び出す処理の中身は、こちら。接続元をsocket.getpeernameで取得して出力しつつ、socket.recvでクライアントから
送信されてきたデータを読み込み、socket.sendで送り返す感じですね。
def handle(client_socket): remote_addr = client_socket.getpeername() print('[{}] {} - handle connection, start - {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), threading.current_thread().getName(), remote_addr)) with client_socket: while True: data = client_socket.recv(recv_size) if not data: break client_socket.send(b'Reply: ' + data) print('[{}] {} - handle connection, exit - {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), threading.current_thread().getName(), remote_addr))
スレッドプールは、max_workersで利用する最大のスレッド数を指定し
with ThreadPoolExecutor(max_workers = workers) as executor:
タスクの実行は、Executor.submitで行います。
executor.submit(handle, client_socket)
クライアント側は、こんな感じ。
echo_client.py
import socket import sys host = 'localhost' port = 8080 server_address = (host, port) recv_size = 1024 message = sys.argv[1] with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client_socket: client_socket.connect(server_address) client_socket.send(message.encode('utf-8')) data = client_socket.recv(recv_size) print(data.decode('utf-8'))
サーバーとの接続はsocket.connectで行い、あとはsocket.sendとsocket.recvでデータの送受信を行います。
動作確認。サーバーを起動。
$ python3 threaded_echo_server.py [2019-05-04 20:22:56] Server startup, thread-pool = 10
クライアントからアクセス。
$ python3 echo_client.py 'Hello World' Reply: Hello World $ python3 echo_client.py 'こんにちは' Reply: こんにちは
サーバー側には、こんなログが出力されます。
[2019-05-04 20:23:16] ThreadPoolExecutor-0_0 - handle connection, start - ('127.0.0.1', 40376) [2019-05-04 20:23:16] ThreadPoolExecutor-0_0 - handle connection, exit - ('127.0.0.1', 40376) [2019-05-04 20:23:21] ThreadPoolExecutor-0_0 - handle connection, start - ('127.0.0.1', 40378) [2019-05-04 20:23:21] ThreadPoolExecutor-0_0 - handle connection, exit - ('127.0.0.1', 40378)
OKそうです。
複数の接続を扱えているかどうかは、クライアント側のプログラムをsleepさせたり、複数のターミナルでncコマンドを実行するなどして
確認するとよいでしょう。
$ echo こんにちは | nc localhost 8080 $ echo Hello | nc localhost 8080
ノンブロッキングIOを使う
続いて、ノンブロッキングIOを使って、同じことを行ってみます。
ノンブロッキングIOに関するモジュールを見ると、高水準のselectors、よりシステムコールに近いselectモジュールがあるようです。
18.4. selectors --- 高水準の I/O 多重化 — Python 3.6.8 ドキュメント
18.3. select --- I/O 処理の完了を待機する — Python 3.6.8 ドキュメント
基本的には、selectorsを使うようですね。
こちらも、サーバー側のみですがドキュメントに使用例が書かれています。
まずは、サーバー側から。
non_blocking_echo_server.py
import selectors import socket from datetime import datetime host = 'localhost' port = 8080 bind_address = (host, port) backlog_size = 10 recv_size = 1024 selector = selectors.DefaultSelector() def accept(server_socket, mask): client_socket, addr = server_socket.accept() client_socket.setblocking(False) selector.register(client_socket, selectors.EVENT_READ, handle) print('[{}] handle connection, start - {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), client_socket.getpeername())) def handle(client_socket, mask): data = client_socket.recv(recv_size) if data: client_socket.send(b'Reply: ' + data) else: selector.unregister(client_socket) print('[{}] handle connection, exit - {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), client_socket.getpeername())) client_socket.close() with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_socket: server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server_socket.bind(bind_address) server_socket.listen(backlog_size) server_socket.setblocking(False) selector.register(server_socket, selectors.EVENT_READ, accept) print('[{}] Server startup'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) try: while True: events = selector.select() for key, mask in events: callback = key.data # callback is accept or handle callback(key.fileobj, mask) # callback function call except KeyboardInterrupt: print('[{}] Server stop'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
Selectorは、selectorsモジュールが提供するDefaultSelectorを使用します。
selector = selectors.DefaultSelector()
自分の環境では、EpollSelectorが利用されました。
ブロッキングIOの時と同様にソケットを使いますが、作成したソケットに対してsocket.setblockingでFalseに設定します。
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_socket: server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server_socket.bind(bind_address) server_socket.listen(backlog_size) server_socket.setblocking(False)
これで、ソケットがノンブロッキングモードになります。
このソケットを、Selectorに登録します。
selector.register(server_socket, selectors.EVENT_READ, accept)
あとは、Selector.selectでイベントを取得して、イベントのキーに紐付けられたコールバック関数を呼び出します。
while True: events = selector.select() for key, mask in events: callback = key.data # callback is accept or handle callback(key.fileobj, mask) # callback function call
ここで呼び出されているコールバック関数は、先ほどSelectorに登録したaccept関数や
selector.register(server_socket, selectors.EVENT_READ, accept)
accept関数の中で、再度Selector.registerで登録しているhandle関数のことです。
def accept(server_socket, mask): client_socket, addr = server_socket.accept() client_socket.setblocking(False) selector.register(client_socket, selectors.EVENT_READ, handle) print('[{}] handle connection, start - {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), client_socket.getpeername())) def handle(client_socket, mask): data = client_socket.recv(recv_size) if data: client_socket.send(b'Reply: ' + data) else: selector.unregister(client_socket) print('[{}] handle connection, exit - {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), client_socket.getpeername())) client_socket.close()
どちらも、読み込みイベントを、監視イベントとして指定しています。
selector.register(server_socket, selectors.EVENT_READ, accept) selector.register(client_socket, selectors.EVENT_READ, handle)
accept関数では接続先のクライアントに対するソケットをノンブロッキングモードに設定し、Selectorに登録します。
handle関数では、クライアントソケットからデータを受信し、データがあればクライアントにsocket.sendでデータを送り返し、
データがなければSelectorから登録解除して切断します。
続いて、クライアント側。こちらもムダにノンブロッキングIOにして、Selectorも使ってみました。
non_blocking_echo_client.py
import selectors import socket import sys host = 'localhost' port = 8080 server_address = (host, port) recv_size = 1024 message = sys.argv[1] selector = selectors.DefaultSelector() received = False def write(client_socket, mask): client_socket.send(message.encode('utf-8')) selector.modify(client_socket, selectors.EVENT_READ, read) def read(client_socket, mask): data = client_socket.recv(recv_size) selector.unregister(client_socket) print(data.decode('utf-8')) client_socket.close() received = True with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client_socket: client_socket.connect(server_address) client_socket.setblocking(False) selector.register(client_socket, selectors.EVENT_WRITE, write) while not received: events = selector.select(0.1) if not events: break for key, mask in events: callback = key.data # callback is write or read callback(key.fileobj, mask) # callback function call
ソケットをsocket.setblockingでFalseを指定し、ノンブロッキングモードにするのはサーバー側と同じです。
クライアント側は、書き込み可能イベントを監視します。
selector.register(client_socket, selectors.EVENT_WRITE, write)
そこから呼び出されるwrite関数ではソケットにデータを書き込み、今度は読み取り可能イベントを監視するように、ソケットを
Selectorに登録し直します。
def write(client_socket, mask): client_socket.send(message.encode('utf-8')) selector.modify(client_socket, selectors.EVENT_READ, read)
結果を受信したら、ソケットをSelectorから登録解除、切断しておしまいです。
def read(client_socket, mask): data = client_socket.recv(recv_size) selector.unregister(client_socket) print(data.decode('utf-8')) client_socket.close() received = True
最後になんかフラグっぽいものがついていますが、これはSelector.selectに引数を指定しないままだとタイムアウトしないので、
タイムアウトを設定して、結果を受信したらループを終えるようにしています。
while not received: events = selector.select(0.1) if not events: break for key, mask in events: callback = key.data # callback is write or read callback(key.fileobj, mask) # callback function call
確認。サーバーを起動。
$ python3 non_blocking_echo_server.py [2019-05-04 20:57:33] Server startup
クライアントを実行。
$ python3 non_blocking_echo_client.py 'こんにちは' Reply: こんにちは $ python3 non_blocking_echo_client.py 'Hello' Reply: Hello
サーバー側のログ。
[2019-05-04 20:57:48] handle connection, start - ('127.0.0.1', 40698) [2019-05-04 20:57:48] handle connection, exit - ('127.0.0.1', 40698) [2019-05-04 20:57:56] handle connection, start - ('127.0.0.1', 40700) [2019-05-04 20:57:56] handle connection, exit - ('127.0.0.1', 40700)
asyncioを使う
最後は、asyncioです。
asyncioは、Python 3.4で追加されたものだそうです。
このモジュールは、コルーチン、ソケットあるいはその他リソースを使用した多重 I/O、ネットワーククライアントあるいはサーバーの実行、およびその他関連するプリミティブを使用した、シングルスレッド処理を並行で実行するコードを作成するためのインフラストラクチャを提供します。
18.5. asyncio --- 非同期 I/O、イベントループ、コルーチンおよびタスク — Python 3.6.8 ドキュメント
基底イベントループのモジュール、より便利なショートカットモジュールがあるようですが、
18.5.1. 基底イベントループ — Python 3.6.8 ドキュメント
18.5.2. イベントループ — Python 3.6.8 ドキュメント
今回のお題で動かすには、以下のどちらかのAPIを使って作成することになります。
- Transports and protocols
- ストリーム
18.5.4. Transports and protocols (callback based API) — Python 3.6.8 ドキュメント
18.5.5. ストリーム (コルーチンベースの API) — Python 3.6.8 ドキュメント
今回は、両方やってみたいと思います。
Transports and protocols
まずは、Transports and protocolsから。
18.5.4. Transports and protocols (callback based API) — Python 3.6.8 ドキュメント
こちらを使うと、TCP、UDP、サブプロセスパイプのプロトコルを使ってasyncioを活用した処理を書くことができます。
ドキュメントのサンプルは、こちら。
ほぼまんま載っているわけですが、とりあえず気にしない。
まずは、サーバー側のプログラムから。
asyncio_echo_server.py
import asyncio from datetime import datetime host = 'localhost' port = 8080 class EchoServerProtocol(asyncio.Protocol): def connection_made(self, transport): self.transport = transport remote_addr = self.transport.get_extra_info('peername') print('[{}] handle connection, start - {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), remote_addr)) def data_received(self, data): remote_addr = self.transport.get_extra_info('peername') self.transport.write(b'Reply: ' + data) print('[{}] handle connection, exit - {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), remote_addr)) self.transport.close() loop = asyncio.get_event_loop() coro = loop.create_server(EchoServerProtocol, host, port) server = loop.run_until_complete(coro) print('[{}] Server startup'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) try: loop.run_forever() except KeyboardInterrupt: print('[{}] Server stop'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) server.close() loop.run_until_complete(server.wait_closed()) loop.close()
こちらのAPIの利用方法では、Protocolクラスを継承したクラスを作成し、必要なメソッドをオーバーライドします。
class EchoServerProtocol(asyncio.Protocol): def connection_made(self, transport): self.transport = transport remote_addr = self.transport.get_extra_info('peername') print('[{}] handle connection, start - {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), remote_addr)) def data_received(self, data): remote_addr = self.transport.get_extra_info('peername') self.transport.write(b'Reply: ' + data) print('[{}] handle connection, exit - {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), remote_addr)) self.transport.close()
継承するクラスによって、どのプロトコルに対応した処理を書くのかを選択します。
Protocolクラスを継承する場合は、TCPになります。
オーバーライドするメソッドは、プロトコル群の継承元のクラスによって変わりますが、Protocolクラスの場合は以下の2つです。
今回は、接続時のログ出力とTransport保持のためにconnection_made、データの受信と送信のためにdata_receivedを
オーバーライドしています。
そして、イベントループと、接続を待ち受けるコネクションを作成し、コルーチンを実行します。
loop = asyncio.get_event_loop() coro = loop.create_server(EchoServerProtocol, host, port) server = loop.run_until_complete(coro)
AbstractEventLoop.create_serverは、TCPサーバーを作成します。このメソッドの戻り値は、コルーチンです。
AbstractEventLoop.create_server
コルーチンについては、こちらを。
18.5.3. タスクとコルーチン — Python 3.6.8 ドキュメント
AbstractEventLoop.run_until_completeにコルーチンを渡すと、Futureが返されます。
AbstractEventLoop.run_until_complete
あとはAbstractEventLoop.run_foreverで走らせ続け、Ctrl-cで止めたらFuture、イベントループをクローズしていきます。
try: loop.run_forever() except KeyboardInterrupt: print('[{}] Server stop'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) server.close() loop.run_until_complete(server.wait_closed()) loop.close()
ところでAbstractEventLoop.create_serverの実体がちょっと気になったので、見てみました。
socket.setblocking(False)を使っているので、(想像に難くはないですが)ノンブロッキングIOを使用します。
cpython/base_events.py at v3.6.7 · python/cpython · GitHub
Selectorは、指定しない場合はDefalutSelectorを使うようです。
cpython/selector_events.py at v3.6.7 · python/cpython · GitHub
*nix環境でのイベントループは、以下のクラスが実体です。
https://github.com/python/cpython/blob/v3.6.7/Lib/asyncio/unix_events.py#L49
参考までに、プロトコル群も。
https://github.com/python/cpython/blob/v3.6.7/Lib/asyncio/protocols.py
クライアント側。
asyncio_echo_client.py
import asyncio import sys host = 'localhost' port = 8080 message = sys.argv[1] class EchoClientProtocol(asyncio.Protocol): def __init__(self, message, loop): self.message = message self.loop = loop def connection_made(self, transport): self.transport = transport self.transport.write(self.message.encode('utf-8')) def data_received(self, data): print(data.decode('utf-8')) self.transport.close() def connection_lost(self, exc): self.loop.stop() loop = asyncio.get_event_loop() coro = loop.create_connection(lambda: EchoClientProtocol(message, loop), host, port) loop.run_until_complete(coro) loop.run_forever() loop.close()
こちらは、サーバーとの接続にAbstractEventLoop.create_connectionを使い、
coro = loop.create_connection(lambda: EchoClientProtocol(message, loop), host, port)
終了したら、イベントループもクローズします。
loop.run_until_complete(coro) loop.run_forever() loop.close()
終了の契機は、data_receivedメソッドでTransportをクローズすることで起こすようにしました。
def data_received(self, data): print(data.decode('utf-8')) self.transport.close()
ちなみに、AbstractEventLoop.create_connectionで作成されるソケットも、ノンブロッキングモードですね。
https://github.com/python/cpython/blob/v3.6.7/Lib/asyncio/base_events.py#L822
確認については、省略。
ストリーム
最後に、ストリームを使って書いてみます。
18.5.5. ストリーム (コルーチンベースの API) — Python 3.6.8 ドキュメント
ストリームを使うと、StreamReaderとStreamWriter、そしてコルーチンを使って処理を書くことができます。
こちらも、ドキュメント内にEchoのサンプルがあります。
サーバー側は、こんな感じで作成。
asyncio_stream_echo_server.py
import asyncio from datetime import datetime host = 'localhost' port = 8080 read_size = 1024 async def handle_echo(reader, writer): remote_addr = writer.get_extra_info('peername') print('[{}] handle connection, start - {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), remote_addr)) data = await reader.read(read_size) # data = await reader.readline() writer.write(b'Reply: ' + data) await writer.drain() print('[{}] handle connection, exit - {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), remote_addr)) writer.close() loop = asyncio.get_event_loop() coro = asyncio.start_server(handle_echo, host, port, loop = loop) server = loop.run_until_complete(coro) print('[{}] Server startup'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) try: loop.run_forever() except KeyboardInterrupt: print('[{}] Server stop'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) server.close() loop.run_until_complete(server.wait_closed()) loop.close()
start_server関数を使って、関数を紐付けつつサーバーを起動します。
loop = asyncio.get_event_loop() coro = asyncio.start_server(handle_echo, host, port, loop = loop) server = loop.run_until_complete(coro)
ここで渡される関数はコルーチンで、引数にはStreamReader、StreamWriterが渡ってきます。
async def handle_echo(reader, writer): remote_addr = writer.get_extra_info('peername') print('[{}] handle connection, start - {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), remote_addr)) data = await reader.read(read_size) # data = await reader.readline() writer.write(b'Reply: ' + data) await writer.drain() print('[{}] handle connection, exit - {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), remote_addr)) writer.close()
ドキュメントのサンプルとは異なり、@asyncio.coroutineをasync defに、yield fromをawaitに書き直しています。
※Python 3.7のドキュメントを見たら、async defとawaitになっていました
StreamReader、StreamWriterのメソッドはコルーチンなので、awaitで結果の待ち合わせをしています。
StreamReader.readで指定されたバイト数のデータを読み込みますが(戻り値はコルーチン)、コメントアウトしているように
StreamReader.readlineを使うのも、今回のケースではありです。
書き込みは、StreamWriter.writeで行います。バッファのフラッシュはStreamWriter.drainで行い、ここはawaitを併用します。
なお、start_serverはAbstractEventLoop.create_serverを呼び出しているようです。
https://github.com/python/cpython/blob/v3.6.7/Lib/asyncio/streams.py#L119
クライアント側。
asyncio_stream_echo_client.py
import asyncio import sys host = 'localhost' port = 8080 read_size = 1024 message = sys.argv[1] async def echo_client(message, loop): reader, writer = await asyncio.open_connection(host, port, loop = loop) writer.write(message.encode('utf-8')) data = await reader.read(read_size) # data = await reader.readline() print(data.decode('utf-8')) writer.close() loop = asyncio.get_event_loop() loop.run_until_complete(echo_client(message, loop)) loop.close()
似たり寄ったりなので、こちらは省略。
全体的に、Transports and protocolsでの書き方よりはシンプルな感じがすると思うのですが…。
まとめ
いろんなパターンでEcho Server/Clientを書いてみました。
それぞれの内容が、別のパターンの基礎になっていたりするので、ドキュメントをなぞったりソースコードを見ていくうえで、
勉強になったなぁという気がします。