RSocket RPCをJavaScriptで試す

Node.jsでRSocket RPCのサーバを試してみようと思ったらドキュメントが見当たらずソースとか誰かの実装サンプル見ながら使い方調べてみたのでまとめました。

RSocketとは

RSocket とはReactiveStreamsをサポートしたアプリケーションプロトコルです。
RSocketの概要に関してはSpring Fest 2019@making さんが発表した資料がわかりやすいのでおすすめです。
https://bit.ly/jsug-rsocket

RSocket RPCとは

RSocketは普通に実装する場合はメッセージやり取りして中身見てハンドリングして処理を行います。
素のRSocketとは別にgRPCっぽい感じでProtocol Buffersからコードを生成してRPC風に処理を呼び出せるRSocket RPCが提供されています。

RSocket自体はTransportにWebSocketが使えるのでブラウザからもRSocket RPCが利用できWebSocketなのでWebサーバと同一ポートで運用することも可能です。

このあと出てくるサンプルの前提

  • サーバ、クライアント共にJavaScript
  • SPAを想定してサーバ側はWebサーバ兼RSocket RPCサーバな構成で同一ポートでリクエストを受け付ける

コードはGitHub に上がってるのでコード見たほうが早い派の人はそちらをどうぞ。

依存のインストール

protoc

Protocol Buffersの定義ファイルからコードを生成するのに使います。
環境に合わせた方法でインストールする必要がありますがMacならHomebrewから楽にインストールできます。

$ brew install protobuf

rsocket-rpc-protobuf

RSocket RPCのProtocol Buffers関連ユーティリティパッケージでprotocのプラグインやRSocket RPC向けのProtocol Buffersの定義ファイルが含まれていてコードを生成する時に使います。

$ yarn add -D rsocket-rpc-protobuf

上記のパッケージとは別に生成されるコードの中でrequireされているパッケージがいくつかあるので個別でインストールする必要があります。
今回の例ではメトリクスやトレーシングはやってないので関係ないのですがエラーになるので必要です。

$ yarn add google-protobuf rsocket-flowable rsocket-rpc-metrics rsocket-rpc-tracing

rsocket-rpc-core

RSocket RPCの諸々パッケージです。クライアントのクラスとリクエストハンドラなどのライブラリが含まれています。

$ yarn add rsocket-rpc-core

rsocket-websocket-server

Transportに使うWebSocketのサーバ側のライブラリです。

$ yarn add rsocket-websocket-server

rsocket-websocket-client

Transportに使うWebSocketのクライアント側のライブラリです。

$ yarn add rsocket-websocket-client

その他諸々

今回の例ではExpressでWebサーバを動かしますがこの辺はお好みで。

$ yarn add express

クライアント側も実装する場合はトランスパイルが必要です。

$ yarn add -D webpack

Protocol Buffersの定義ファイル

まずリクエストパラメータとレスポンスとそれらを扱うサービスを定義します。
以下の例はRequest Responseでリクエストしてレスポンスを受け取る例でほぼgRPCと同じです。
Fire and Forgetなど他のパターンの場合だとRSocket RPCっぽい感じの記述も必要になります。

# helloworld.proto
syntax = "proto3";

package helloworld;

service Hello {
  rpc sayHello (HelloRequest) returns (HelloResponse) {}
}

message HelloRequest {
  string name = 1;
}

message HelloResponse {
  string message = 1;
}

定義ファイルが書けたらコードをビルドします。

$ protoc --proto_path=. --js_out=import_style=commonjs,binary:. --rsocket_rpc_out=. --plugin=protoc-gen-rsocket_rpc=node_modules/.bin/rsocket_rpc_js_protoc_plugin helloworld.proto

--proto_path--js_outあたりはJavaScript向けにProtocol Buffersのコード生成する時の一般的なオプションなので細かい説明は省略。
--rsocket_rpc_outでRSocket RPC用の生成されるコードの出力先を指定します。
--pluginでRSocket RPC用のコードを生成するためのプラグインのパスを指定します。
プラグインは依存で追加したrsocket-rpc-protobufに含まれていてnode_modules/.bin/にバイナリへのシンボリックリンクがあるのでそれをパスに指定します。

生成に成功すると以下のようなファイルが生成されます。

helloworld_pb.js
helloworld_rsocket_pb.js

サーバ側

処理の実装

RPCで呼ばれる処理を実装します。
Javaだとprotocでインターフェイスが生成されますがJavaScript向けにはそういった気の利いたものは提供されないようなので自力でお作法通りの挙動をするコードを用意する必要があります。

const { HelloResponse } = require('./protos/helloworld_pb');
const { Single } = require('rsocket-flowable');

class HelloService {
  constructor() {}

  sayHello(message, metadata) {
    const name = message.getName() || 'world';
    let response = new HelloResponse();
    response.setMessage(`Hello ${name}!`);
    return Single.of(response);
  }
};

ポイントはこんな感じです。

  • 定義ファイルに記述したRPC名のメソッド
  • 第一引数はRPCのリクエストパラメータのメッセージ
  • 第二引数はメタデータ
  • Request Responseの場合はSingle、Streamを返す場合はFlowableにレスポンスのメッセージを突っ込んで返す

上記の例だとクラスで実装してますがオブジェクトのRPC名のプロパティを関数として呼び出しするだけっぽいので多分ただのオブジェクトでも大丈夫です。

リクエストハンドラの実装

リクエストが来た時に呼び出されるサービスを登録します。

const { RequestHandlingRSocket } = require('rsocket-rpc-core');
const HelloService = require('./HelloService');
const { HelloServer } = require('./helloworld_rsocket_pb');

const getRequestHandler = () => {
  // ハンドラ用のライブラリ
  const service = new RequestHandlingRSocket();

  // 処理を書いたサービス(上で書いたクラス)
  const helloService = new HelloService();

  // ハンドラに登録
  // 第一引数 {パッケージ名}.{サービス名}の文字列
  // 第二引数 自動生成されたクラスにサービス渡してnewしたやつ
  service.addService('helloworld.Hello', new HelloServer(helloService));

  return service;
};

ややこしいので大まかな処理はサンプルコードを参照してください。
{サービス名}Serverという名前で自動生成されるクラスですがサーバでもなんでも無くて、RSocketのリクエストデータからどのメソッドを呼ぶかのハンドリング処理を行ったり、レスポンスやエラーの処理を行っているだけみたいです。
単一サービスの場合はそのままreturnしても動くのですが、複数サービスの場合はすべて呼び出せるようにする必要があるためサービスのハンドリングを行ってくれるRequestHandlingRSocketを使います。

Express周りの実装

今回の例は同一ポートでWebサーバとRSocket RPCのサーバを動かす例なのでExpressでWebサーバの部分を実装します。
http.createServer()に突っ込める系のフレームワークなら多分なんでも大丈夫なのでExpressである必要は無いです。

const express = require('express');
const http = require('http');

const app = express();

app.use(express.static('public'));

const server = http.createServer(app);

ただのWebサーバならapp.listen()を呼べばサーバが起動しますが、WebSocketと連携するのでhttp.createServer()に突っ込んでhttp.Serverのインスタンスを受け取るのがポイントです。

RSocket RPCサーバの実装

残りのRSocket RPCサーバの部分の実装です。

const { RSocketServer, BufferEncoders } = require('rsocket-core');
const RSocketWebsocketServer = require('rsocket-websocket-server').default;
const { RequestHandlingRSocket } = require('rsocket-rpc-core');

// 【中略】上記のExpress周りの実装

const transportOpts = {
  server,
  path: '/rsocket'
};
const transport = new RSocketWebsocketServer(transportOpts, BufferEncoders);

// 【中略】上記のリクエストハンドラの実装

const rSocketServer = new RSocketServer({ transport, getRequestHandler });

まず、TransportのRSocketWebsocketServerのインスタンスを作成します。
第一引数のオプションはws のサーバのオプションになっていてここからWebSocketの設定を行います。
必須なのはhttp.Serverのインスタンスとリクエストを受け付けるパスでその他の設定はお好みで。
第二引数はクライアントとのやり取りに使うエンコードが選べるらしくそれのエンコーダを設定します。
クライアント側でも同様の設定を行うのですが必ずクライアント側と同じエンコーダを設定する必要があります。
違うエンコーダを設定してしまったとしても親切なエラーなんて出なくてJavaScriptの未定義プロパティへのアクセスのエラーとかがでてデバッグで苦しむことになるので注意してください。
Transportの設定ができたらRSocketServerRSocketWebsocketServerのインスタンスとリクエストハンドラを定義した関数を渡してインスタンスを作成します。

サーバ起動

最後にサーバを起動します。
Webサーバが起動しただけだとRSocketのサーバが起動してくれないので別で起動用のメソッドを呼ぶ必要があります。

server.listen(3000, () => {
  rSocketServer.start();
  console.log('Server started on port 3000');
});

クライアント側

import RSocketWebSocketClient from 'rsocket-websocket-client';
import { BufferEncoders } from 'rsocket-core';
import { RpcClient } from 'rsocket-rpc-core';

import { HelloClient } from './protos/helloworld_rsocket_pb';
import { HelloRequest } from './protos/helloworld_pb';

(async () => {
  const transportOpts = { url: 'ws://localhost:3000/rsocket' };
  const transport = new RSocketWebSocketClient(transportOpts, BufferEncoders);

  const clientOpts = {
    setup: {
      keepAlive: 10000,
      lifetime: 20000,
    },
    transport,
  };
  const client = new RpcClient(clientOpts);

  const rsocket = await client.connect();

  const helloService = new HelloClient(rsocket);

  const exec = async () => {
    const name = document.getElementById('name').value;

    const request = new HelloRequest();
    request.setName(name);

    const response = await helloService.sayHello(request);

    console.log(response.getMessage());
  };

  document.getElementById('exec').addEventListener('click', exec);
})();

クライアント側は検索するとサンプルや解説が豊富なので細かい解説は省略。
クライアントをセットアップしてサービスに渡して実行したい処理を呼び出すだけです。
サーバ側の説明でも書きましたがエンコーダの設定はサーバ側と同じものを指定する必要があるので注意が必要です。

実行例

トランスパイルして実行してブラウザからアクセスしてRPCを呼び出すとWebSocketで通信してデータが返ってきてるのが確認できます。

参考リンク