RSocket RPCをJavaScriptで試す
Node.jsでRSocket RPCのサーバを試してみようと思ったらドキュメントが見当たらずソースとか誰かの実装サンプル見ながら使い方調べてみたのでまとめました。
RSocketとは
RSocket
とはReactiveStreamsをサポートしたアプリケーションプロトコルです。
RSocketの概要に関してはSpring Fest 2019
で@making
さんが発表した資料がわかりやすいのでおすすめです。
https://bit.ly/jsug-rsocket
RSocket RPCとは
RSocketは普通に実装する場合はメッセージやり取りして中身見てハンドリングして処理を行います。
素のRSocketとは別にgRPCっぽい感じでProtocol Buffersからコードを生成してRPC風に処理を呼び出せるRSocket RPCが提供されています。
- rsocket/rsocket-rpc-java
- rsocket/rsocket-rpc-js
- rsocket/rsocket-rpc-net
- rsocket/rsocket-rpc-kotlin
- rsocket/rsocket-rpc-go
RSocket自体はTransportにWebSocketが使えるのでブラウザからもRSocket RPCが利用できWebSocketなのでWebサーバと同一ポートで運用することも可能です。
このあと出てくるサンプルの前提
- サーバ、クライアント共にJavaScript
- SPAを想定してサーバ側はWebサーバ兼RSocket RPCサーバな構成で同一ポートでリクエストを受け付ける
コードはGitHub に上がってるのでコード見たほうが早い派の人はそちらをどうぞ。
依存のインストール
protoc
Protocol Buffersの定義ファイルからコードを生成するのに使います。
環境に合わせた方法でインストールする必要がありますがMacならHomebrewから楽にインストールできます。
$ brew install protobuf
rsocket-rpc-protobuf
RSocket RPCのProtocol Buffers関連ユーティリティパッケージでprotocのプラグインやRSocket RPC向けのProtocol Buffersの定義ファイルが含まれていてコードを生成する時に使います。
$ yarn add -D rsocket-rpc-protobuf
上記のパッケージとは別に生成されるコードの中でrequire
されているパッケージがいくつかあるので個別でインストールする必要があります。
今回の例ではメトリクスやトレーシングはやってないので関係ないのですがエラーになるので必要です。
$ yarn add google-protobuf rsocket-flowable rsocket-rpc-metrics rsocket-rpc-tracing
rsocket-rpc-core
RSocket RPCの諸々パッケージです。クライアントのクラスとリクエストハンドラなどのライブラリが含まれています。
$ yarn add rsocket-rpc-core
rsocket-websocket-server
Transportに使うWebSocketのサーバ側のライブラリです。
$ yarn add rsocket-websocket-server
rsocket-websocket-client
Transportに使うWebSocketのクライアント側のライブラリです。
$ yarn add rsocket-websocket-client
その他諸々
今回の例ではExpressでWebサーバを動かしますがこの辺はお好みで。
$ yarn add express
クライアント側も実装する場合はトランスパイルが必要です。
$ yarn add -D webpack
Protocol Buffersの定義ファイル
まずリクエストパラメータとレスポンスとそれらを扱うサービスを定義します。
以下の例はRequest Responseでリクエストしてレスポンスを受け取る例でほぼgRPCと同じです。
Fire and Forgetなど他のパターンの場合だとRSocket RPCっぽい感じの記述も必要になります。
# helloworld.proto
syntax = "proto3";
package helloworld;
service Hello {
rpc sayHello (HelloRequest) returns (HelloResponse) {}
}
message HelloRequest {
string name = 1;
}
message HelloResponse {
string message = 1;
}
定義ファイルが書けたらコードをビルドします。
$ protoc --proto_path=. --js_out=import_style=commonjs,binary:. --rsocket_rpc_out=. --plugin=protoc-gen-rsocket_rpc=node_modules/.bin/rsocket_rpc_js_protoc_plugin helloworld.proto
--proto_path
と--js_out
あたりはJavaScript向けにProtocol Buffersのコード生成する時の一般的なオプションなので細かい説明は省略。
--rsocket_rpc_out
でRSocket RPC用の生成されるコードの出力先を指定します。
--plugin
でRSocket RPC用のコードを生成するためのプラグインのパスを指定します。
プラグインは依存で追加したrsocket-rpc-protobuf
に含まれていてnode_modules/.bin/
にバイナリへのシンボリックリンクがあるのでそれをパスに指定します。
生成に成功すると以下のようなファイルが生成されます。
helloworld_pb.js
helloworld_rsocket_pb.js
サーバ側
処理の実装
RPCで呼ばれる処理を実装します。
Javaだとprotocでインターフェイスが生成されますがJavaScript向けにはそういった気の利いたものは提供されないようなので自力でお作法通りの挙動をするコードを用意する必要があります。
const { HelloResponse } = require('./protos/helloworld_pb');
const { Single } = require('rsocket-flowable');
class HelloService {
constructor() {}
sayHello(message, metadata) {
const name = message.getName() || 'world';
let response = new HelloResponse();
response.setMessage(`Hello ${name}!`);
return Single.of(response);
}
};
ポイントはこんな感じです。
- 定義ファイルに記述したRPC名のメソッド
- 第一引数はRPCのリクエストパラメータのメッセージ
- 第二引数はメタデータ
- Request Responseの場合は
Single
、Streamを返す場合はFlowable
にレスポンスのメッセージを突っ込んで返す
上記の例だとクラスで実装してますがオブジェクトのRPC名のプロパティを関数として呼び出しするだけっぽいので多分ただのオブジェクトでも大丈夫です。
リクエストハンドラの実装
リクエストが来た時に呼び出されるサービスを登録します。
const { RequestHandlingRSocket } = require('rsocket-rpc-core');
const HelloService = require('./HelloService');
const { HelloServer } = require('./helloworld_rsocket_pb');
const getRequestHandler = () => {
// ハンドラ用のライブラリ
const service = new RequestHandlingRSocket();
// 処理を書いたサービス(上で書いたクラス)
const helloService = new HelloService();
// ハンドラに登録
// 第一引数 {パッケージ名}.{サービス名}の文字列
// 第二引数 自動生成されたクラスにサービス渡してnewしたやつ
service.addService('helloworld.Hello', new HelloServer(helloService));
return service;
};
ややこしいので大まかな処理はサンプルコードを参照してください。
{サービス名}Server
という名前で自動生成されるクラスですがサーバでもなんでも無くて、RSocketのリクエストデータからどのメソッドを呼ぶかのハンドリング処理を行ったり、レスポンスやエラーの処理を行っているだけみたいです。
単一サービスの場合はそのままreturn
しても動くのですが、複数サービスの場合はすべて呼び出せるようにする必要があるためサービスのハンドリングを行ってくれるRequestHandlingRSocket
を使います。
Express周りの実装
今回の例は同一ポートでWebサーバとRSocket RPCのサーバを動かす例なのでExpressでWebサーバの部分を実装します。
http.createServer()
に突っ込める系のフレームワークなら多分なんでも大丈夫なのでExpressである必要は無いです。
const express = require('express');
const http = require('http');
const app = express();
app.use(express.static('public'));
const server = http.createServer(app);
ただのWebサーバならapp.listen()
を呼べばサーバが起動しますが、WebSocketと連携するのでhttp.createServer()
に突っ込んでhttp.Server
のインスタンスを受け取るのがポイントです。
RSocket RPCサーバの実装
残りのRSocket RPCサーバの部分の実装です。
const { RSocketServer, BufferEncoders } = require('rsocket-core');
const RSocketWebsocketServer = require('rsocket-websocket-server').default;
const { RequestHandlingRSocket } = require('rsocket-rpc-core');
// 【中略】上記のExpress周りの実装
const transportOpts = {
server,
path: '/rsocket'
};
const transport = new RSocketWebsocketServer(transportOpts, BufferEncoders);
// 【中略】上記のリクエストハンドラの実装
const rSocketServer = new RSocketServer({ transport, getRequestHandler });
まず、TransportのRSocketWebsocketServer
のインスタンスを作成します。
第一引数のオプションはws
のサーバのオプションになっていてここからWebSocketの設定を行います。
必須なのはhttp.Server
のインスタンスとリクエストを受け付けるパスでその他の設定はお好みで。
第二引数はクライアントとのやり取りに使うエンコードが選べるらしくそれのエンコーダを設定します。
クライアント側でも同様の設定を行うのですが必ずクライアント側と同じエンコーダを設定する必要があります。
違うエンコーダを設定してしまったとしても親切なエラーなんて出なくてJavaScriptの未定義プロパティへのアクセスのエラーとかがでてデバッグで苦しむことになるので注意してください。
Transportの設定ができたらRSocketServer
にRSocketWebsocketServer
のインスタンスとリクエストハンドラを定義した関数を渡してインスタンスを作成します。
サーバ起動
最後にサーバを起動します。
Webサーバが起動しただけだとRSocketのサーバが起動してくれないので別で起動用のメソッドを呼ぶ必要があります。
server.listen(3000, () => {
rSocketServer.start();
console.log('Server started on port 3000');
});
クライアント側
import RSocketWebSocketClient from 'rsocket-websocket-client';
import { BufferEncoders } from 'rsocket-core';
import { RpcClient } from 'rsocket-rpc-core';
import { HelloClient } from './protos/helloworld_rsocket_pb';
import { HelloRequest } from './protos/helloworld_pb';
(async () => {
const transportOpts = { url: 'ws://localhost:3000/rsocket' };
const transport = new RSocketWebSocketClient(transportOpts, BufferEncoders);
const clientOpts = {
setup: {
keepAlive: 10000,
lifetime: 20000,
},
transport,
};
const client = new RpcClient(clientOpts);
const rsocket = await client.connect();
const helloService = new HelloClient(rsocket);
const exec = async () => {
const name = document.getElementById('name').value;
const request = new HelloRequest();
request.setName(name);
const response = await helloService.sayHello(request);
console.log(response.getMessage());
};
document.getElementById('exec').addEventListener('click', exec);
})();
クライアント側は検索するとサンプルや解説が豊富なので細かい解説は省略。
クライアントをセットアップしてサービスに渡して実行したい処理を呼び出すだけです。
サーバ側の説明でも書きましたがエンコーダの設定はサーバ側と同じものを指定する必要があるので注意が必要です。
実行例
トランスパイルして実行してブラウザからアクセスしてRPCを呼び出すとWebSocketで通信してデータが返ってきてるのが確認できます。