Class | Jabber::Bytestreams::IBB |
In: |
lib/xmpp4r/bytestreams/helper/ibb/base.rb
|
Parent: | Object |
In-Band Bytestreams (JEP-0047) implementation
Don‘t use directly, use IBBInitiator and IBBTarget
In-Band Bytestreams should only be used when transferring very small amounts of binary data, because it is slow and increases server load drastically.
Note that the constructor takes a lot of arguments. In-Band Bytestreams do not specify a way to initiate the stream, this should be done via Stream Initiation.
NS_IBB | = | 'http://jabber.org/protocol/ibb' |
Create a new bytestream
Will register a <message/> callback to intercept data of this stream. This data will be buffered, you can retrieve it with receive
# File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 26 26: def initialize(stream, session_id, my_jid, peer_jid) 27: @stream = stream 28: @session_id = session_id 29: @my_jid = (my_jid.kind_of?(String) ? JID.new(my_jid) : my_jid) 30: @peer_jid = (peer_jid.kind_of?(String) ? JID.new(peer_jid) : peer_jid) 31: 32: @active = false 33: @seq_send = 0 34: @seq_recv = 0 35: @queue = [] 36: @queue_lock = Mutex.new 37: @pending = Mutex.new 38: @pending.lock 39: @sendbuf = '' 40: @sendbuf_lock = Mutex.new 41: 42: @block_size = 4096 # Recommended by JEP0047 43: end
Close the stream
Waits for acknowledge from peer, may throw ErrorException
# File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 125 125: def close 126: if active? 127: flush 128: deactivate 129: 130: iq = Iq.new(:set, @peer_jid) 131: close = iq.add REXML::Element.new('close') 132: close.add_namespace IBB::NS_IBB 133: close.attributes['sid'] = @session_id 134: 135: @stream.send_with_id(iq) { |answer| 136: answer.type == :result 137: } 138: end 139: end
Empty the send-buffer by sending remaining data
# File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 69 69: def flush 70: @sendbuf_lock.synchronize { 71: while @sendbuf.size > 0 72: send_data(@sendbuf[0..@block_size-1]) 73: @sendbuf = @sendbuf[@block_size..-1].to_s 74: end 75: } 76: end
Receive data
Will wait until the Message with the next sequence number is in the stanza queue.
# File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 83 83: def read 84: if active? 85: res = nil 86: 87: while res.nil? 88: @queue_lock.synchronize { 89: @queue.each { |item| 90: # Find next data 91: if item.type == :data and item.seq == @seq_recv.to_s 92: res = item 93: break 94: # No data? Find close 95: elsif item.type == :close and res.nil? 96: res = item 97: end 98: } 99: 100: @queue.delete_if { |item| item == res } 101: } 102: 103: # No data? Wait for next to arrive... 104: @pending.lock unless res 105: end 106: 107: if res.type == :data 108: @seq_recv += 1 109: @seq_recv = 0 if @seq_recv > 65535 110: res.data 111: elsif res.type == :close 112: deactivate 113: nil # Closed 114: end 115: else 116: nil 117: end 118: end
Send data
Data is buffered to match block_size in each packet. If you need the data to be sent immediately, use flush afterwards.
buf: | [String] |
# File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 56 56: def write(buf) 57: @sendbuf_lock.synchronize { 58: @sendbuf += buf 59: 60: while @sendbuf.size >= @block_size 61: send_data(@sendbuf[0..@block_size-1]) 62: @sendbuf = @sendbuf[@block_size..-1].to_s 63: end 64: } 65: end
# File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 179 179: def activate 180: unless active? 181: @stream.add_message_callback(200, self) { |msg| 182: data = msg.first_element('data') 183: if msg.from == @peer_jid and msg.to == @my_jid and data and data.attributes['sid'] == @session_id 184: if msg.type == nil 185: @queue_lock.synchronize { 186: @queue.push IBBQueueItem.new(:data, data.attributes['seq'], data.text.to_s) 187: @pending.unlock 188: } 189: elsif msg.type == :error 190: @queue_lock.synchronize { 191: @queue << IBBQueueItem.new(:close) 192: @pending.unlock 193: } 194: end 195: true 196: else 197: false 198: end 199: } 200: 201: @stream.add_iq_callback(200, self) { |iq| 202: close = iq.first_element('close') 203: if iq.type == :set and close and close.attributes['sid'] == @session_id 204: answer = iq.answer(false) 205: answer.type = :result 206: @stream.send(answer) 207: 208: @queue_lock.synchronize { 209: @queue << IBBQueueItem.new(:close) 210: @pending.unlock 211: } 212: true 213: else 214: false 215: end 216: } 217: 218: @active = true 219: end 220: end
# File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 222 222: def deactivate 223: if active? 224: @stream.delete_message_callback(self) 225: @stream.delete_iq_callback(self) 226: 227: @active = false 228: end 229: end
Send data directly
data: | [String] |
# File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 146 146: def send_data(databuf) 147: if active? 148: msg = Message.new 149: msg.from = @my_jid 150: msg.to = @peer_jid 151: 152: data = msg.add REXML::Element.new('data') 153: data.add_namespace NS_IBB 154: data.attributes['sid'] = @session_id 155: data.attributes['seq'] = @seq_send.to_s 156: data.text = Base64::encode64 databuf 157: 158: # TODO: Implement AMP correctly 159: amp = msg.add REXML::Element.new('amp') 160: amp.add_namespace 'http://jabber.org/protocol/amp' 161: deliver_at = amp.add REXML::Element.new('rule') 162: deliver_at.attributes['condition'] = 'deliver-at' 163: deliver_at.attributes['value'] = 'stored' 164: deliver_at.attributes['action'] = 'error' 165: match_resource = amp.add REXML::Element.new('rule') 166: match_resource.attributes['condition'] = 'match-resource' 167: match_resource.attributes['value'] = 'exact' 168: match_resource.attributes['action'] = 'error' 169: 170: @stream.send(msg) 171: 172: @seq_send += 1 173: @seq_send = 0 if @seq_send > 65535 174: else 175: raise 'Attempt to send data when not activated' 176: end 177: end