Redis
 sql >> Base de données >  >> NoSQL >> Redis

Accéder à une variable dans un thread de rails

MODIFICATION MISE À JOUR À LA FIN :affiche le code de travail. Module principal non modifié à l'exception du code de débogage. Remarque :J'ai rencontré le problème que j'ai déjà signalé concernant la nécessité de se désabonner avant la résiliation.

Le code semble correct. J'aimerais voir comment vous l'instanciez.

Dans config/application.rb, vous avez probablement au moins quelque chose comme :

require 'ws_communication'
config.middleware.use WsCommunication

Ensuite, dans votre client JavaScript, vous devriez avoir quelque chose comme ceci :

var ws = new WebSocket(uri);

Instanciez-vous une autre instance de WsCommunication ? Cela définirait @clients sur un tableau vide et pourrait présenter vos symptômes. Quelque chose comme ceci serait incorrect :

var ws = new WsCommunication;

Cela nous aiderait si vous montriez le client et, peut-être, config/application.rb si ce message ne vous aide pas.

Soit dit en passant, je suis d'accord avec le commentaire selon lequel @clients devrait être protégé par un mutex sur toute mise à jour, s'il n'est pas lu également. C'est une structure dynamique qui peut changer à tout moment dans un système événementiel. redis-mutex est une bonne option. (J'espère que ce lien est correct car Github semble lancer 500 erreurs sur tout en ce moment.)

Vous pouvez également noter que $redis.publish renvoie une valeur entière du nombre de clients qui ont reçu le message.

Enfin, vous devrez peut-être vous assurer que votre chaîne est désabonnée avant la résiliation. J'ai eu des situations où j'ai fini par envoyer chaque message plusieurs fois, voire plusieurs fois, à cause d'abonnements antérieurs au même canal qui n'ont pas été nettoyés. Étant donné que vous vous abonnez à la chaîne dans un fil, vous devrez vous désabonner dans ce même fil ou le processus "se bloquera" en attendant que le bon fil apparaisse comme par magie. Je gère cette situation en définissant un drapeau "désabonnement" puis en envoyant un message. Ensuite, dans le bloc on.message, je teste l'indicateur de désabonnement et j'émets le désabonnement ici.

Le module que vous avez fourni, avec seulement des modifications de débogage mineures :

require 'faye/websocket'
require 'redis'

class WsCommunication
  KEEPALIVE_TIME = 15 #seconds
  CHANNEL = 'vip-deck'

  def initialize(app)
    @app = app
    @clients = []
    uri = URI.parse(ENV['REDISCLOUD_URL'])
    $redis = Redis.new(host: uri.host, port: uri.port, password: uri.password)
    Thread.new do
      redis_sub = Redis.new(host: uri.host, port: uri.port, password: uri.password)
      redis_sub.subscribe(CHANNEL) do |on|
        on.message do |channel, msg|
          puts "Message event. Clients receiving:#{@clients.count};"
          @clients.each { |ws| ws.send(msg) }
        end
      end
    end
  end

  def call(env)
    if Faye::WebSocket.websocket?(env)
      ws = Faye::WebSocket.new(env, nil, {ping: KEEPALIVE_TIME})

      ws.on :open do |event|
        @clients << ws
        puts "Open event. Clients open:#{@clients.count};"
      end

      ws.on :message do |event|
        receivers = $redis.publish(CHANNEL, event.data)
        puts "Message published:#{event.data}; Receivers:#{receivers};"
      end

      ws.on :close do |event|
        @clients.delete(ws)
        puts "Close event. Clients open:#{@clients.count};"
        ws = nil
      end

      ws.rack_response
    else
      @app.call(env)
    end
  end
end

Le code d'abonné test que j'ai fourni :

# encoding: UTF-8
puts "Starting client-subscriber.rb"
$:.unshift File.expand_path '../lib', File.dirname(__FILE__)
require 'rubygems'
require 'eventmachine'
require 'websocket-client-simple'

puts "websocket-client-simple v#{WebSocket::Client::Simple::VERSION}"

url = ARGV.shift || 'ws://localhost:3000'

EM.run do

  ws = WebSocket::Client::Simple.connect url

  ws.on :message do |msg|
    puts msg
  end

  ws.on :open do
    puts "-- Subscriber open (#{ws.url})"
  end

  ws.on :close do |e|
    puts "-- Subscriber close (#{e.inspect})"
    exit 1
  end

  ws.on :error do |e|
    puts "-- Subscriber error (#{e.inspect})"
  end

end

Le code d'éditeur de test que j'ai fourni. L'éditeur et l'abonné peuvent facilement être combinés, car ce ne sont que des tests :

# encoding: UTF-8
puts "Starting client-publisher.rb"
$:.unshift File.expand_path '../lib', File.dirname(__FILE__)
require 'rubygems'
require 'eventmachine'
require 'json'
require 'websocket-client-simple'

puts "websocket-client-simple v#{WebSocket::Client::Simple::VERSION}"

url = ARGV.shift || 'ws://localhost:3000'

EM.run do
  count ||= 0
  timer = EventMachine.add_periodic_timer(5+rand(5)) do
    count += 1
    send({"MESSAGE": "COUNT:#{count};"})
  end

  @ws = WebSocket::Client::Simple.connect url

  @ws.on :message do |msg|
    puts msg
  end

  @ws.on :open do
    puts "-- Publisher open"
  end

  @ws.on :close do |e|
    puts "-- Publisher close (#{e.inspect})"
    exit 1
  end

  @ws.on :error do |e|
    puts "-- Publisher error (#{e.inspect})"
    @ws.close
  end

  def self.send message
    payload = message.is_a?(Hash) ? message : {payload: message}
    @ws.send(payload.to_json)
  end
end

Un exemple de config.ru qui exécute tout cela au niveau de la couche middleware du rack :

require './controllers/main'
require './middlewares/ws_communication'
use WsCommunication
run Main.new

C'est principal. Je l'ai supprimé de ma version en cours d'exécution, il faudra donc peut-être le peaufiner si vous l'utilisez :

%w(rubygems bundler sinatra/base json erb).each { |m| require m }
ENV['RACK_ENV'] ||= 'development'
Bundler.require
$: << File.expand_path('../', __FILE__)
$: << File.expand_path('../lib', __FILE__)

Dir["./lib/*.rb", "./lib/**/*.rb"].each { |file| require file }
env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']

  class Main < Sinatra::Base

    env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']
    get "/" do
      erb :"index.html"
    end

    get "/assets/js/application.js" do
      content_type :js
      @scheme = env == "production" ? "wss://" : "ws://"
      erb :"application.js"
    end
  end