monolithic kernel

JavaScriptでTwitterのUser Streams APIを利用する

User Streams APIをJavaScriptから利用する方法についての情報が非常に少なかったのでメモ。利用にはXMLHttpRequestでクロスドメイン通信可能な環境が必要です。今回はGoogle Chromeの拡張機能にあるBackground Pageを前提として説明します。

OAuth認証

OAuth認証は以前の記事でTwitter APIの呼び出しとして説明している方法と同じです。

var oauth = ChromeExOAuth.initBackgroundPage({
  'request_url': 'https://twitter.com/oauth/request_token',
  'authorize_url': 'https://twitter.com/oauth/authorize',
  'access_url': 'https://twitter.com/oauth/access_token',
  'consumer_key': '',
  'consumer_secret': ''
});

接続

接続は普通にXMLHttpRequestを利用すれば問題ありません。

問題になるのはデータを受信する方法です。他の言語でUser Streams APIを利用する場合、データを受け取るたびにコールバック関数が呼ばれ、そこで新たに受信したデータを処理するという方法が利用されますが、JavaScriptではこの方法を利用することができません。

そのため、XMLHttpRequestオブジェクトのresponseTextを定期的に確認し、新着データが無いか確認することで接続を維持したまま処理していきます。また、これだけだとresponseTextがどんどん伸びてメモリを大量に消費してしまうため、ある程度の量を受信したら接続を切断してresponseTextを解放してやる必要があります。

切断・再接続

User Streamsの接続が切断された場合、再接続して再びデータを受信できるように復帰する必要がありますが、何も考えずに再接続するような実装だとTwitter側でエラーが発生した場合にTwitterに攻撃を仕掛けているような状態になってしまいます。

そのため、エラーによって切断された場合には少し待ってから再接続するようにします。再接続までの待ち時間は公式のドキュメントで示されており、初回は20から40秒で、連続して失敗するたびに待ち時間を倍にしていき、最大で240から300秒待つとされています。

ソースコード

これらの点を踏まえて実装してみたのが以下のコードです。やり方が分かってしまえば大したことはないですね。

var userStreamUrl = 'https://userstream.twitter.com/2/user.json';
oauth.authorize(function() {
  var wait = 0;
  var connectionError = false;
  var connect = function() {
    // XMLHttpRequestを構築
    var method = 'GET';
    var xhr = new XMLHttpRequest();
    var signedUrl = oauth.signURL(userStreamUrl, method, { oauth_version: '1.0' });
    xhr.open(method, signedUrl, true);

    var items = 0;
    var offset = 0;
    var elapsed = 0;
    var interval = setInterval(function() {
      // 何も受信せずに90秒経過した場合
      if (++elapsed > 90) {
        // エラーとして切断
        connectionError = true;
        xhr.abort();
        return;
      }

      // 500件を超えるアイテムを受信した場合
      if (items > 500) {
        // メモリを開放するため切断
        connectionError = false;
        xhr.abort();
        return;
      }

      var responseText = xhr.responseText;
      for (;;) {
        // 開始位置から \r までの範囲を切り出す
        var index = responseText.indexOf("\r", offset);
        if (index == -1) {
          break;
        }
        var line = responseText.substr(offset, index - offset);

        // 接続維持用の空行でない場合
        if (line.length >= 2) {
          // JSONとしてパース
          var item = JSON.parse(line);
          console.log(item); // itemを利用して処理を行う
          ++items;
        }

        // 次に読み出す位置を設定
        offset = index + 2;
        // 経過時間をリセット
        elapsed = 0;
      }
    }, 1000);

    xhr.onreadystatechange = function(data) {
      // 通信が終了した場合
      if (xhr.readyState == 4) {
        // responseTextの監視を終了
        clearInterval(interval);

        // エラーだった場合
        if (connectionError || xhr.status != 200) {
          // 待ち時間を 20, 40, 80, 160, 320, 320, ... [sec] と変化させる
          if (wait == 0) {
            wait = 20000;
          } else if (wait < 240000) {
            wait *= 2;
          }
          connectionError = false;
        } else {
          // エラーでない場合は待ち時間を 0 にする
          wait = 0;
        }

        // 再接続
        setTimeout(connect, wait);
      }
    };

    xhr.send(null);
  };

  // 接続
  connect();
});

おわりに

拙作のTwitterRealtimeNotificationというChrome拡張でこの手法を実際に利用しているので、合わせて見ると分かりやすいかもしれません。

GitHub