CLOVER🍀

That was when it all began.

PythonでEcho Server/Clientを書いてみる

これは、なにをしたくて書いたもの?

ちょっとしたPythonの勉強がてらに、TCPのEcho Server/Clientを書いてみようと。

Pythonでの、ネットワークプログラミングの基礎を覚えてみる感じで。

お題

文字通り、PythonでTCPでEcho Server/Clientを書くわけですが、特にサーバー側は以下の条件を満たすように作成します。

  • 受け取ったメッセージに対して「Reply: 」を付与して返す
  • 同時に複数の接続を扱える(ひとつの接続を相手にしている間、他の接続の処理を一切行えない、という状況にしない)
  • 停止はCtrl-c

クライアントの条件は、起動引数で受け取ったメッセージを送って、サーバーから受け取ったメッセージを標準出力に
書き出して終了、とします。

作成方法は、以下の3つ(+α)で行うようにします。

いずれも、Pythonのドキュメントを見ればだいたい答えは載っているのですが、自分で試してみるということで。

やってみた感想ですが、現時点でこういうのを作るのなら、asyncioのストリームを使うんでしょうかね?

環境

今回の環境は、こちら。

$ python3 -V
Python 3.6.7

OSは、Ubuntu Linux 18.04 LTSです。

ブロッキングIO+スレッドプールを使う

まずは、1番基本っぽいやつを。ソケットを使います。

18.1. socket --- 低水準ネットワークインターフェイス — Python 3.6.8 ドキュメント

使用例に、まんまEcho Server/Clientのことが書いています。

socket / 使用例

が、それだけだと複数の接続の相手ができないので、スレッドプールを使うことにします。

スレッドプールには、concurrent.futuresモジュールのThreadPoolExecutorを使用することにしましょう。

17.4. concurrent.futures -- 並列タスク実行 — Python 3.6.8 ドキュメント

ThreadPoolExecutor

threaded_echo_server.py

import socket
import threading

from concurrent.futures import ThreadPoolExecutor
from datetime import datetime

host = 'localhost'
port = 8080
bind_address = (host, port)

workers = 10

backlog_size = 10
recv_size = 1024


def handle(client_socket):
    remote_addr = client_socket.getpeername()
    
    print('[{}] {} - handle connection, start - {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), threading.current_thread().getName(), remote_addr))
    
    with client_socket:
        while True:
            data = client_socket.recv(recv_size)

            if not data:
                break

            client_socket.send(b'Reply: ' + data)

    print('[{}] {} - handle connection, exit - {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), threading.current_thread().getName(), remote_addr))


with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_socket:
    with ThreadPoolExecutor(max_workers = workers) as executor:
        server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        server_socket.bind(bind_address)
        server_socket.listen(backlog_size)

        print('[{}] Server startup, thread-pool = {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), workers))

        try:
            while True:
                client_socket, addr = server_socket.accept()
                executor.submit(handle, client_socket)
        except KeyboardInterrupt:
            print('[{}] Server stop'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))

まず、ソケットの作成。

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_socket:

第1引数はアドレスファミリー、第2引数はソケットタイプです。他にも引数を指定可能ですが、そちらはドキュメントへ。

socket.socket

AF_INETアドレスファミリーでは、接続先のホスト、ポートのペアを指定してソケットにバインドします。
SOCK_STREAMというのは、TCPのことです(UDPは、SOCK_DGRAM)。

以下の処理では、ソケットに対してアドレスをバインドし、受け入れ可能なバックログを指定してリッスンを開始しています。

        server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        server_socket.bind(bind_address)
        server_socket.listen(backlog_size)

「socket.SO_REUSEADDR」の部分は、TIME_WAIT状態になったソケットを再利用できるようにするオプションです。
これを指定しない場合は、プログラムを起動、終了後、再度起動しようとすると以下のエラーが発生することがあり、
その回避のためになります。

OSError: [Errno 98] Address already in use

あとは、作成しておいたスレッドプールに、socket.acceptで受信したソケットと、処理を行う関数を渡して、別スレッドでの
処理に移します。

        try:
            while True:
                client_socket, addr = server_socket.accept()
                executor.submit(handle, client_socket)
        except KeyboardInterrupt:
            print('[{}] Server stop'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))

別スレッドから呼び出す処理の中身は、こちら。接続元をsocket.getpeernameで取得して出力しつつ、socket.recvでクライアントから
送信されてきたデータを読み込み、socket.sendで送り返す感じですね。

def handle(client_socket):
    remote_addr = client_socket.getpeername()
    
    print('[{}] {} - handle connection, start - {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), threading.current_thread().getName(), remote_addr))
    
    with client_socket:
        while True:
            data = client_socket.recv(recv_size)

            if not data:
                break

            client_socket.send(b'Reply: ' + data)

    print('[{}] {} - handle connection, exit - {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), threading.current_thread().getName(), remote_addr))

スレッドプールは、max_workersで利用する最大のスレッド数を指定し

ThreadPoolExecutor

    with ThreadPoolExecutor(max_workers = workers) as executor:

タスクの実行は、Executor.submitで行います。

Executor.submit

                executor.submit(handle, client_socket)

クライアント側は、こんな感じ。
echo_client.py

import socket
import sys

host = 'localhost'
port = 8080
server_address = (host, port)

recv_size = 1024

message = sys.argv[1]

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client_socket:
    client_socket.connect(server_address)
    client_socket.send(message.encode('utf-8'))

    data = client_socket.recv(recv_size)

    print(data.decode('utf-8'))

サーバーとの接続はsocket.connectで行い、あとはsocket.sendとsocket.recvでデータの送受信を行います。

socket.connect

動作確認。サーバーを起動。

$ python3 threaded_echo_server.py 
[2019-05-04 20:22:56] Server startup, thread-pool = 10

クライアントからアクセス。

$ python3 echo_client.py 'Hello World'
Reply: Hello World
$ python3 echo_client.py 'こんにちは'
Reply: こんにちは

サーバー側には、こんなログが出力されます。

[2019-05-04 20:23:16] ThreadPoolExecutor-0_0 - handle connection, start - ('127.0.0.1', 40376)
[2019-05-04 20:23:16] ThreadPoolExecutor-0_0 - handle connection, exit - ('127.0.0.1', 40376)
[2019-05-04 20:23:21] ThreadPoolExecutor-0_0 - handle connection, start - ('127.0.0.1', 40378)
[2019-05-04 20:23:21] ThreadPoolExecutor-0_0 - handle connection, exit - ('127.0.0.1', 40378)

OKそうです。

複数の接続を扱えているかどうかは、クライアント側のプログラムをsleepさせたり、複数のターミナルでncコマンドを実行するなどして
確認するとよいでしょう。

$ echo こんにちは | nc localhost 8080
$ echo Hello | nc localhost 8080

ノンブロッキングIOを使う

続いて、ノンブロッキングIOを使って、同じことを行ってみます。

ノンブロッキングIOに関するモジュールを見ると、高水準のselectors、よりシステムコールに近いselectモジュールがあるようです。

18.4. selectors --- 高水準の I/O 多重化 — Python 3.6.8 ドキュメント

18.3. select --- I/O 処理の完了を待機する — Python 3.6.8 ドキュメント

基本的には、selectorsを使うようですね。

こちらも、サーバー側のみですがドキュメントに使用例が書かれています。

selectors / 使用例

まずは、サーバー側から。
non_blocking_echo_server.py

import selectors
import socket

from datetime import datetime

host = 'localhost'
port = 8080
bind_address = (host, port)

backlog_size = 10
recv_size = 1024

selector = selectors.DefaultSelector()


def accept(server_socket, mask):
    client_socket, addr = server_socket.accept()
    client_socket.setblocking(False)
    selector.register(client_socket, selectors.EVENT_READ, handle)

    print('[{}] handle connection, start - {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), client_socket.getpeername()))

def handle(client_socket, mask):
    data = client_socket.recv(recv_size)
    if data:
        client_socket.send(b'Reply: ' + data)
    else:
        selector.unregister(client_socket)
        print('[{}] handle connection, exit - {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), client_socket.getpeername()))
        client_socket.close()


with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_socket:
    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server_socket.bind(bind_address)
    server_socket.listen(backlog_size)
    server_socket.setblocking(False)
    selector.register(server_socket, selectors.EVENT_READ, accept)

    print('[{}] Server startup'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))

    try:
        while True:
            events = selector.select()
            for key, mask in events:
                callback = key.data  # callback is accept or handle
                callback(key.fileobj, mask)  # callback function call
    except KeyboardInterrupt:
        print('[{}] Server stop'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))

Selectorは、selectorsモジュールが提供するDefaultSelectorを使用します。

selector = selectors.DefaultSelector()

自分の環境では、EpollSelectorが利用されました。

ブロッキングIOの時と同様にソケットを使いますが、作成したソケットに対してsocket.setblockingでFalseに設定します。

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_socket:
    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server_socket.bind(bind_address)
    server_socket.listen(backlog_size)
    server_socket.setblocking(False)

これで、ソケットがノンブロッキングモードになります。

socket.setblocking

このソケットを、Selectorに登録します。

    selector.register(server_socket, selectors.EVENT_READ, accept)

あとは、Selector.selectでイベントを取得して、イベントのキーに紐付けられたコールバック関数を呼び出します。

        while True:
            events = selector.select()
            for key, mask in events:
                callback = key.data  # callback is accept or handle
                callback(key.fileobj, mask)  # callback function call

BaseSelector.select

SelectorKey

ここで呼び出されているコールバック関数は、先ほどSelectorに登録したaccept関数や

    selector.register(server_socket, selectors.EVENT_READ, accept)

accept関数の中で、再度Selector.registerで登録しているhandle関数のことです。

def accept(server_socket, mask):
    client_socket, addr = server_socket.accept()
    client_socket.setblocking(False)
    selector.register(client_socket, selectors.EVENT_READ, handle)

    print('[{}] handle connection, start - {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), client_socket.getpeername()))

def handle(client_socket, mask):
    data = client_socket.recv(recv_size)
    if data:
        client_socket.send(b'Reply: ' + data)
    else:
        selector.unregister(client_socket)
        print('[{}] handle connection, exit - {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), client_socket.getpeername()))
        client_socket.close()

どちらも、読み込みイベントを、監視イベントとして指定しています。

    selector.register(server_socket, selectors.EVENT_READ, accept)

    selector.register(client_socket, selectors.EVENT_READ, handle)

accept関数では接続先のクライアントに対するソケットをノンブロッキングモードに設定し、Selectorに登録します。

handle関数では、クライアントソケットからデータを受信し、データがあればクライアントにsocket.sendでデータを送り返し、
データがなければSelectorから登録解除して切断します。

続いて、クライアント側。こちらもムダにノンブロッキングIOにして、Selectorも使ってみました。
non_blocking_echo_client.py

import selectors
import socket
import sys

host = 'localhost'
port = 8080
server_address = (host, port)

recv_size = 1024

message = sys.argv[1]

selector = selectors.DefaultSelector()

received = False


def write(client_socket, mask):
    client_socket.send(message.encode('utf-8'))
    selector.modify(client_socket, selectors.EVENT_READ, read)

def read(client_socket, mask):
    data = client_socket.recv(recv_size)
    selector.unregister(client_socket)
    print(data.decode('utf-8'))

    client_socket.close()

    received = True

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client_socket:
    client_socket.connect(server_address)
    client_socket.setblocking(False)
    selector.register(client_socket, selectors.EVENT_WRITE, write)

    while not received:
        events = selector.select(0.1)

        if not events:
            break
        
        for key, mask in events:
            callback = key.data  # callback is write or read
            callback(key.fileobj, mask)  # callback function call

ソケットをsocket.setblockingでFalseを指定し、ノンブロッキングモードにするのはサーバー側と同じです。

クライアント側は、書き込み可能イベントを監視します。

    selector.register(client_socket, selectors.EVENT_WRITE, write)

そこから呼び出されるwrite関数ではソケットにデータを書き込み、今度は読み取り可能イベントを監視するように、ソケットを
Selectorに登録し直します。

def write(client_socket, mask):
    client_socket.send(message.encode('utf-8'))
    selector.modify(client_socket, selectors.EVENT_READ, read)

結果を受信したら、ソケットをSelectorから登録解除、切断しておしまいです。

def read(client_socket, mask):
    data = client_socket.recv(recv_size)
    selector.unregister(client_socket)
    print(data.decode('utf-8'))

    client_socket.close()

    received = True

最後になんかフラグっぽいものがついていますが、これはSelector.selectに引数を指定しないままだとタイムアウトしないので、
タイムアウトを設定して、結果を受信したらループを終えるようにしています。

    while not received:
        events = selector.select(0.1)

        if not events:
            break
        
        for key, mask in events:
            callback = key.data  # callback is write or read
            callback(key.fileobj, mask)  # callback function call

確認。サーバーを起動。

$ python3 non_blocking_echo_server.py 
[2019-05-04 20:57:33] Server startup

クライアントを実行。

$ python3 non_blocking_echo_client.py 'こんにちは'
Reply: こんにちは
$ python3 non_blocking_echo_client.py 'Hello'
Reply: Hello

サーバー側のログ。

[2019-05-04 20:57:48] handle connection, start - ('127.0.0.1', 40698)
[2019-05-04 20:57:48] handle connection, exit - ('127.0.0.1', 40698)
[2019-05-04 20:57:56] handle connection, start - ('127.0.0.1', 40700)
[2019-05-04 20:57:56] handle connection, exit - ('127.0.0.1', 40700)

asyncioを使う

最後は、asyncioです。

asyncioは、Python 3.4で追加されたものだそうです。

このモジュールは、コルーチン、ソケットあるいはその他リソースを使用した多重 I/O、ネットワーククライアントあるいはサーバーの実行、およびその他関連するプリミティブを使用した、シングルスレッド処理を並行で実行するコードを作成するためのインフラストラクチャを提供します。

18.5. asyncio --- 非同期 I/O、イベントループ、コルーチンおよびタスク — Python 3.6.8 ドキュメント

基底イベントループのモジュール、より便利なショートカットモジュールがあるようですが、

18.5.1. 基底イベントループ — Python 3.6.8 ドキュメント

18.5.2. イベントループ — Python 3.6.8 ドキュメント

今回のお題で動かすには、以下のどちらかのAPIを使って作成することになります。

  • Transports and protocols
  • ストリーム

18.5.4. Transports and protocols (callback based API) — Python 3.6.8 ドキュメント

18.5.5. ストリーム (コルーチンベースの API) — Python 3.6.8 ドキュメント

今回は、両方やってみたいと思います。

Transports and protocols

まずは、Transports and protocolsから。

18.5.4. Transports and protocols (callback based API) — Python 3.6.8 ドキュメント

こちらを使うと、TCP、UDP、サブプロセスパイプのプロトコルを使ってasyncioを活用した処理を書くことができます。

ドキュメントのサンプルは、こちら。

TCP Echo クライアントプロトコル

TCP Echo サーバープロトコル

ほぼまんま載っているわけですが、とりあえず気にしない。

まずは、サーバー側のプログラムから。
asyncio_echo_server.py

import asyncio

from datetime import datetime

host = 'localhost'
port = 8080


class EchoServerProtocol(asyncio.Protocol):
    def connection_made(self, transport):
        self.transport = transport
        remote_addr = self.transport.get_extra_info('peername')
        print('[{}] handle connection, start - {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), remote_addr))

    def data_received(self, data):
        remote_addr = self.transport.get_extra_info('peername')
        self.transport.write(b'Reply: ' + data)
        print('[{}] handle connection, exit - {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), remote_addr))
        self.transport.close()


loop = asyncio.get_event_loop()
coro = loop.create_server(EchoServerProtocol, host, port)
server = loop.run_until_complete(coro)

print('[{}] Server startup'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))

try:
    loop.run_forever()
except KeyboardInterrupt:
    print('[{}] Server stop'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))

server.close()
loop.run_until_complete(server.wait_closed())
loop.close()

こちらのAPIの利用方法では、Protocolクラスを継承したクラスを作成し、必要なメソッドをオーバーライドします。

class EchoServerProtocol(asyncio.Protocol):
    def connection_made(self, transport):
        self.transport = transport
        remote_addr = self.transport.get_extra_info('peername')
        print('[{}] handle connection, start - {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), remote_addr))

    def data_received(self, data):
        remote_addr = self.transport.get_extra_info('peername')
        self.transport.write(b'Reply: ' + data)
        print('[{}] handle connection, exit - {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), remote_addr))
        self.transport.close()

継承するクラスによって、どのプロトコルに対応した処理を書くのかを選択します。

プロトコルクラス群

Protocolクラスを継承する場合は、TCPになります。

オーバーライドするメソッドは、プロトコル群の継承元のクラスによって変わりますが、Protocolクラスの場合は以下の2つです。

コネクションコールバック

ストリーミングプロトコル

今回は、接続時のログ出力とTransport保持のためにconnection_made、データの受信と送信のためにdata_receivedを
オーバーライドしています。

そして、イベントループと、接続を待ち受けるコネクションを作成し、コルーチンを実行します。

loop = asyncio.get_event_loop()
coro = loop.create_server(EchoServerProtocol, host, port)
server = loop.run_until_complete(coro)

AbstractEventLoop.create_serverは、TCPサーバーを作成します。このメソッドの戻り値は、コルーチンです。

AbstractEventLoop.create_server

コルーチンについては、こちらを。

18.5.3. タスクとコルーチン — Python 3.6.8 ドキュメント

AbstractEventLoop.run_until_completeにコルーチンを渡すと、Futureが返されます。

AbstractEventLoop.run_until_complete

ensure_future

あとはAbstractEventLoop.run_foreverで走らせ続け、Ctrl-cで止めたらFuture、イベントループをクローズしていきます。

try:
    loop.run_forever()
except KeyboardInterrupt:
    print('[{}] Server stop'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))

server.close()
loop.run_until_complete(server.wait_closed())
loop.close()

ところでAbstractEventLoop.create_serverの実体がちょっと気になったので、見てみました。

socket.setblocking(False)を使っているので、(想像に難くはないですが)ノンブロッキングIOを使用します。

cpython/base_events.py at v3.6.7 · python/cpython · GitHub

Selectorは、指定しない場合はDefalutSelectorを使うようです。

cpython/selector_events.py at v3.6.7 · python/cpython · GitHub

*nix環境でのイベントループは、以下のクラスが実体です。

https://github.com/python/cpython/blob/v3.6.7/Lib/asyncio/unix_events.py#L49

参考までに、プロトコル群も。

https://github.com/python/cpython/blob/v3.6.7/Lib/asyncio/protocols.py

クライアント側。
asyncio_echo_client.py

import asyncio
import sys

host = 'localhost'
port = 8080

message = sys.argv[1]


class EchoClientProtocol(asyncio.Protocol):
    def __init__(self, message, loop):
        self.message = message
        self.loop = loop

    def connection_made(self, transport):
        self.transport = transport
        self.transport.write(self.message.encode('utf-8'))

    def data_received(self, data):
        print(data.decode('utf-8'))
        self.transport.close()

    def connection_lost(self, exc):
        self.loop.stop()


loop = asyncio.get_event_loop()
coro = loop.create_connection(lambda: EchoClientProtocol(message, loop), host, port)

loop.run_until_complete(coro)
loop.run_forever()
loop.close()

こちらは、サーバーとの接続にAbstractEventLoop.create_connectionを使い、

coro = loop.create_connection(lambda: EchoClientProtocol(message, loop), host, port)

終了したら、イベントループもクローズします。

loop.run_until_complete(coro)
loop.run_forever()
loop.close()

終了の契機は、data_receivedメソッドでTransportをクローズすることで起こすようにしました。

    def data_received(self, data):
        print(data.decode('utf-8'))
        self.transport.close()

ちなみに、AbstractEventLoop.create_connectionで作成されるソケットも、ノンブロッキングモードですね。

https://github.com/python/cpython/blob/v3.6.7/Lib/asyncio/base_events.py#L822

確認については、省略。

ストリーム

最後に、ストリームを使って書いてみます。

18.5.5. ストリーム (コルーチンベースの API) — Python 3.6.8 ドキュメント

ストリームを使うと、StreamReaderとStreamWriter、そしてコルーチンを使って処理を書くことができます。

StreamReader

StreamWriter

こちらも、ドキュメント内にEchoのサンプルがあります。

ストリームを使った TCP Echo クライアント

ストリームを使った TCP Echo サーバー

サーバー側は、こんな感じで作成。
asyncio_stream_echo_server.py

import asyncio

from datetime import datetime

host = 'localhost'
port = 8080

read_size = 1024

async def handle_echo(reader, writer):
    remote_addr = writer.get_extra_info('peername')
    print('[{}] handle connection, start - {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), remote_addr))

    data = await reader.read(read_size)
    # data = await reader.readline()
    writer.write(b'Reply: ' + data)

    await writer.drain()

    print('[{}] handle connection, exit - {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), remote_addr))
    writer.close()


loop = asyncio.get_event_loop()
coro = asyncio.start_server(handle_echo, host, port, loop = loop)
server = loop.run_until_complete(coro)

print('[{}] Server startup'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))

try:
    loop.run_forever()
except KeyboardInterrupt:
    print('[{}] Server stop'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))

server.close()
loop.run_until_complete(server.wait_closed())
loop.close()

start_server関数を使って、関数を紐付けつつサーバーを起動します。

loop = asyncio.get_event_loop()
coro = asyncio.start_server(handle_echo, host, port, loop = loop)
server = loop.run_until_complete(coro)

ここで渡される関数はコルーチンで、引数にはStreamReader、StreamWriterが渡ってきます。

async def handle_echo(reader, writer):
    remote_addr = writer.get_extra_info('peername')
    print('[{}] handle connection, start - {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), remote_addr))

    data = await reader.read(read_size)
    # data = await reader.readline()
    writer.write(b'Reply: ' + data)

    await writer.drain()

    print('[{}] handle connection, exit - {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), remote_addr))
    writer.close()

ドキュメントのサンプルとは異なり、@asyncio.coroutineをasync defに、yield fromをawaitに書き直しています。
※Python 3.7のドキュメントを見たら、async defとawaitになっていました

StreamReader、StreamWriterのメソッドはコルーチンなので、awaitで結果の待ち合わせをしています。

StreamReader.readで指定されたバイト数のデータを読み込みますが(戻り値はコルーチン)、コメントアウトしているように
StreamReader.readlineを使うのも、今回のケースではありです。

書き込みは、StreamWriter.writeで行います。バッファのフラッシュはStreamWriter.drainで行い、ここはawaitを併用します。

なお、start_serverはAbstractEventLoop.create_serverを呼び出しているようです。

https://github.com/python/cpython/blob/v3.6.7/Lib/asyncio/streams.py#L119

クライアント側。
asyncio_stream_echo_client.py

import asyncio
import sys

host = 'localhost'
port = 8080

read_size = 1024

message = sys.argv[1]


async def echo_client(message, loop):
    reader, writer = await asyncio.open_connection(host, port, loop = loop)

    writer.write(message.encode('utf-8'))

    data = await reader.read(read_size)
    # data = await reader.readline()
    print(data.decode('utf-8'))

    writer.close()


loop = asyncio.get_event_loop()
loop.run_until_complete(echo_client(message, loop))
loop.close()

似たり寄ったりなので、こちらは省略。

全体的に、Transports and protocolsでの書き方よりはシンプルな感じがすると思うのですが…。

まとめ

いろんなパターンでEcho Server/Clientを書いてみました。

それぞれの内容が、別のパターンの基礎になっていたりするので、ドキュメントをなぞったりソースコードを見ていくうえで、
勉強になったなぁという気がします。