source: trunk/sources/thelib/src/protocols/rtmp/basertmpprotocol.cpp @ 765

Revision 765, 27.0 KB checked in by shiretu, 13 days ago (diff)

-- fixes from EvoStream?

Line 
1/*
2 *  Copyright (c) 2010,
3 *  Gavriloaie Eugen-Andrei (shiretu@gmail.com)
4 *
5 *  This file is part of crtmpserver.
6 *  crtmpserver is free software: you can redistribute it and/or modify
7 *  it under the terms of the GNU General Public License as published by
8 *  the Free Software Foundation, either version 3 of the License, or
9 *  (at your option) any later version.
10 *
11 *  crtmpserver is distributed in the hope that it will be useful,
12 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 *  GNU General Public License for more details.
15 *
16 *  You should have received a copy of the GNU General Public License
17 *  along with crtmpserver.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20
21#ifdef HAS_PROTOCOL_RTMP
22#include "application/clientapplicationmanager.h"
23#include "netio/netio.h"
24#include "protocols/protocolmanager.h"
25#include "protocols/rtmp/basertmpprotocol.h"
26#include "protocols/rtmp/basertmpappprotocolhandler.h"
27#include "protocols/rtmp/messagefactories/messagefactories.h"
28#include "protocols/rtmp/streaming/baseoutnetrtmpstream.h"
29#include "streaming/streamstypes.h"
30#include "protocols/rtmp/streaming/innetrtmpstream.h"
31#include "protocols/rtmp/streaming/infilertmpstream.h"
32#include "protocols/rtmp/streaming/rtmpstream.h"
33#include "streaming/streamstypes.h"
34#include "protocols/rtmp/monitorrtmpprotocol.h"
35#include "protocols/rtmp/sharedobjects/so.h"
36
37#define MAX_RTMP_OUTPUT_BUFFER 1024*256
38
39uint8_t BaseRTMPProtocol::genuineFMSKey[] = {
40        0x47, 0x65, 0x6e, 0x75, 0x69, 0x6e, 0x65, 0x20,
41        0x41, 0x64, 0x6f, 0x62, 0x65, 0x20, 0x46, 0x6c,
42        0x61, 0x73, 0x68, 0x20, 0x4d, 0x65, 0x64, 0x69,
43        0x61, 0x20, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72,
44        0x20, 0x30, 0x30, 0x31, // Genuine Adobe Flash Media Server 001
45        0xf0, 0xee, 0xc2, 0x4a, 0x80, 0x68, 0xbe, 0xe8,
46        0x2e, 0x00, 0xd0, 0xd1, 0x02, 0x9e, 0x7e, 0x57,
47        0x6e, 0xec, 0x5d, 0x2d, 0x29, 0x80, 0x6f, 0xab,
48        0x93, 0xb8, 0xe6, 0x36, 0xcf, 0xeb, 0x31, 0xae
49}; // 68
50
51uint8_t BaseRTMPProtocol::genuineFPKey[] = {
52        0x47, 0x65, 0x6E, 0x75, 0x69, 0x6E, 0x65, 0x20,
53        0x41, 0x64, 0x6F, 0x62, 0x65, 0x20, 0x46, 0x6C,
54        0x61, 0x73, 0x68, 0x20, 0x50, 0x6C, 0x61, 0x79,
55        0x65, 0x72, 0x20, 0x30, 0x30, 0x31, // Genuine Adobe Flash Player 001
56        0xF0, 0xEE, 0xC2, 0x4A, 0x80, 0x68, 0xBE, 0xE8,
57        0x2E, 0x00, 0xD0, 0xD1, 0x02, 0x9E, 0x7E, 0x57,
58        0x6E, 0xEC, 0x5D, 0x2D, 0x29, 0x80, 0x6F, 0xAB,
59        0x93, 0xB8, 0xE6, 0x36, 0xCF, 0xEB, 0x31, 0xAE
60}; // 62
61
62BaseRTMPProtocol::BaseRTMPProtocol(uint64_t protocolType)
63: BaseProtocol(protocolType) {
64        _handshakeCompleted = false;
65        _rtmpState = RTMP_STATE_NOT_INITIALIZED;
66        //TODO: Make use of winacksize which is in fact the value setted up for
67        //the nex bytes sent report
68        _winAckSize = RECEIVED_BYTES_COUNT_REPORT_CHUNK;
69        _nextReceivedBytesCountReport = _winAckSize;
70        for (uint32_t i = 0; i < MAX_CHANNELS_COUNT; i++) {
71                _channels[i].id = i;
72                _channels[i].Reset();
73        }
74        _selectedChannel = -1;
75        _inboundChunkSize = 128;
76        _outboundChunkSize = 128;
77
78        for (uint32_t i = 0; i < MAX_STREAMS_COUNT; i++) {
79                _streams[i] = NULL;
80        }
81
82        for (uint32_t i = MIN_AV_CHANNLES; i < MAX_AV_CHANNLES; i++)
83                ADD_VECTOR_END(_channelsPool, i);
84
85        _pSignaledRTMPOutNetStream = NULL;
86        _rxInvokes = 0;
87        _txInvokes = 0;
88
89#ifdef ENFORCE_RTMP_OUTPUT_CHECKS
90        _pMonitor = new MonitorRTMPProtocol(MAX_STREAMS_COUNT, MAX_CHANNELS_COUNT);
91#endif /* ENFORCE_RTMP_OUTPUT_CHECKS */
92}
93
94BaseRTMPProtocol::~BaseRTMPProtocol() {
95        for (uint32_t i = 0; i < MAX_STREAMS_COUNT; i++) {
96                if (_streams[i] != NULL) {
97                        delete _streams[i];
98                        _streams[i] = NULL;
99                }
100        }
101        LinkedListNode<BaseOutNetRTMPStream *> *pTemp = _pSignaledRTMPOutNetStream;
102        while (pTemp != NULL) {
103                pTemp = RemoveLinkedList<BaseOutNetRTMPStream *> (pTemp);
104        }
105        _pSignaledRTMPOutNetStream = NULL;
106        while (_inFileStreams.size() > 0) {
107                InFileRTMPStream *pStream = MAP_VAL(_inFileStreams.begin());
108                delete pStream;
109                _inFileStreams.erase(pStream);
110        }
111#ifdef ENFORCE_RTMP_OUTPUT_CHECKS
112        if (_pMonitor != NULL) {
113                delete _pMonitor;
114                _pMonitor = NULL;
115        }
116#endif /* ENFORCE_RTMP_OUTPUT_CHECKS */
117
118        FOR_MAP(_sos, string, ClientSO *, i) {
119                delete MAP_VAL(i);
120        }
121        _sos.clear();
122}
123
124ClientSO *BaseRTMPProtocol::GetSO(string &name) {
125        map<string, ClientSO *>::iterator i = _sos.find(name);
126        if (i == _sos.end())
127                return NULL;
128        return MAP_VAL(i);
129}
130
131bool BaseRTMPProtocol::CreateSO(string &name) {
132        if (GetType() != PT_OUTBOUND_RTMP) {
133                FATAL("Incorrect RTMP protocol type for opening SO");
134                return false;
135        }
136        if (GetSO(name) != NULL) {
137                FATAL("So already present");
138                return false;
139        }
140        _sos[name] = new ClientSO();
141        _sos[name]->name(name);
142        _sos[name]->version(1);
143        return true;
144}
145
146void BaseRTMPProtocol::SignalBeginSOProcess(string &name) {
147
148}
149
150void BaseRTMPProtocol::SignalEndSOProcess(string &name, uint32_t versionNumber) {
151        ClientSO *pSO = NULL;
152        if (!MAP_HAS1(_sos, name)) {
153                //FATAL("Client SO %s not found", STR(name));
154                return;
155        }
156        pSO = _sos[name];
157        pSO->version(versionNumber);
158        if (pSO->changedProperties().MapSize() == 0)
159                return;
160        _pProtocolHandler->SignalClientSOUpdated(this, pSO);
161        pSO->changedProperties().RemoveAllKeys();
162        return;
163}
164
165bool BaseRTMPProtocol::ClientSOSend(string &name, Variant &parameters) {
166        ClientSO *pSO = NULL;
167        if (!MAP_HAS1(_sos, name)) {
168                FATAL("Client SO %s not found", STR(name));
169                return false;
170        }
171        pSO = _sos[name];
172        Variant message = SOMessageFactory::GetSharedObject(3, 0, 0, false, name,
173                        pSO->version(), pSO->persistent());
174        SOMessageFactory::AddSOPrimitiveSend(message, parameters);
175        return SendMessage(message);
176}
177
178bool BaseRTMPProtocol::ClientSOSetProperty(string &soName, string &propName,
179                Variant &propValue) {
180        ClientSO *pSO = NULL;
181        if (!MAP_HAS1(_sos, soName)) {
182                FATAL("Client SO %s not found", STR(soName));
183                return false;
184        }
185        pSO = _sos[soName];
186        Variant message = SOMessageFactory::GetSharedObject(3, 0, 0, false, soName,
187                        pSO->version(), pSO->persistent());
188        SOMessageFactory::AddSOPrimitiveSetProperty(message, propName, propValue);
189        if (!SendMessage(message)) {
190                FATAL("Unable to set property value");
191                return false;
192        }
193        pSO->changedProperties().PushToArray(propName);
194        if ((propValue == V_NULL) || (propValue == V_UNDEFINED))
195                pSO->properties().RemoveKey(propName);
196        else
197                pSO->properties()[propName] = propValue;
198        _pProtocolHandler->SignalClientSOUpdated(this, pSO);
199        pSO->changedProperties().RemoveAllKeys();
200        return true;
201}
202
203bool BaseRTMPProtocol::HandleSOPrimitive(string &name, Variant &primitive) {
204        ClientSO *pSO = NULL;
205        if (!MAP_HAS1(_sos, name)) {
206                FATAL("Client SO %s not found", STR(name));
207                return false;
208        }
209        pSO = _sos[name];
210        switch ((uint8_t) primitive[RM_SHAREDOBJECTPRIMITIVE_TYPE]) {
211                case SOT_CS_UPDATE_FIELD:
212                case SOT_SC_INITIAL_DATA:
213                {
214
215                        FOR_MAP(primitive[RM_SHAREDOBJECTPRIMITIVE_PAYLOAD], string, Variant, i) {
216                                pSO->properties()[MAP_KEY(i)] = MAP_VAL(i);
217                                pSO->changedProperties().PushToArray(MAP_KEY(i));
218                        }
219                        if ((uint8_t) primitive[RM_SHAREDOBJECTPRIMITIVE_TYPE] == SOT_SC_INITIAL_DATA) {
220                                _pProtocolHandler->SignalClientSOConnected(this, pSO);
221                        }
222                        return true;
223                }
224                case SOT_SC_CLEAR_DATA:
225                {
226
227                        FOR_MAP(pSO->properties(), string, Variant, i) {
228                                pSO->changedProperties().PushToArray(MAP_KEY(i));
229                        }
230                        pSO->properties().RemoveAllKeys();
231                        return true;
232                }
233                case SOT_SC_DELETE_FIELD:
234                {
235
236                        FOR_MAP(primitive[RM_SHAREDOBJECTPRIMITIVE_PAYLOAD], string, Variant, i) {
237                                pSO->properties().RemoveKey((string) MAP_VAL(i));
238                                pSO->changedProperties().PushToArray(MAP_VAL(i));
239                        }
240                        return true;
241                }
242                case SOT_BW_SEND_MESSAGE:
243                {
244                        _pProtocolHandler->SignalClientSOSend(this, pSO,
245                                        primitive[RM_SHAREDOBJECTPRIMITIVE_PAYLOAD]);
246                        return true;
247                }
248                case SOT_CS_UPDATE_FIELD_ACK:
249                {
250                        return true;
251                }
252                default:
253                {
254                        FATAL("Primitive not supported\n%s", STR(primitive.ToString()));
255                        return false;
256                }
257        }
258        return true;
259}
260
261bool BaseRTMPProtocol::Initialize(Variant &parameters) {
262        GetCustomParameters() = parameters;
263        return true;
264}
265
266bool BaseRTMPProtocol::AllowFarProtocol(uint64_t type) {
267        if (type == PT_TCP
268                        || type == PT_RTMPE
269                        || type == PT_INBOUND_SSL
270                        || type == PT_INBOUND_HTTP_FOR_RTMP)
271                return true;
272        return false;
273}
274
275bool BaseRTMPProtocol::AllowNearProtocol(uint64_t type) {
276        FATAL("This protocol doesn't allow any near protocols");
277        return false;
278}
279
280IOBuffer * BaseRTMPProtocol::GetOutputBuffer() {
281        if (GETAVAILABLEBYTESCOUNT(_outputBuffer) > 0)
282                return &_outputBuffer;
283        return NULL;
284}
285
286bool BaseRTMPProtocol::SignalInputData(int32_t recvAmount) {
287        ASSERT("OPERATION NOT SUPPORTED");
288        return false;
289}
290
291bool BaseRTMPProtocol::SignalInputData(IOBuffer &buffer) {
292        if (_enqueueForDelete)
293                return true;
294
295        bool result = false;
296        if (_handshakeCompleted) {
297                result = ProcessBytes(buffer);
298                uint64_t decodedBytes = GetDecodedBytesCount();
299                if (result && (decodedBytes >= _nextReceivedBytesCountReport)) {
300                        Variant _bytesReadMessage = GenericMessageFactory::GetAck(decodedBytes);
301                        _nextReceivedBytesCountReport += _winAckSize;
302                        if (!SendMessage(_bytesReadMessage)) {
303                                FATAL("Unable to send\n%s", STR(_bytesReadMessage.ToString()));
304                                return false;
305                        }
306                }
307        } else {
308                result = PerformHandshake(buffer);
309                if (!result) {
310                        FATAL("Unable to perform handshake");
311                        return false;
312                }
313                if (_handshakeCompleted) {
314                        result = SignalInputData(buffer);
315                        if (result && (GetType() == PT_OUTBOUND_RTMP)) {
316                                result = _pProtocolHandler->OutboundConnectionEstablished(
317                                                (OutboundRTMPProtocol *) this);
318                        }
319                }
320        }
321        return result;
322}
323
324bool BaseRTMPProtocol::TimePeriodElapsed() {
325        ASSERT("Operation not supported");
326        return false;
327}
328
329void BaseRTMPProtocol::ReadyForSend() {
330        LinkedListNode<BaseOutNetRTMPStream *> *pTemp = _pSignaledRTMPOutNetStream;
331        while (pTemp != NULL) {
332                pTemp->info->ReadyForSend();
333                pTemp = pTemp->pPrev;
334        }
335}
336
337void BaseRTMPProtocol::SetApplication(BaseClientApplication *pApplication) {
338        BaseProtocol::SetApplication(pApplication);
339        if (pApplication != NULL) {
340                _pProtocolHandler = (BaseRTMPAppProtocolHandler *)
341                                pApplication->GetProtocolHandler(this);
342        } else {
343                _pProtocolHandler = NULL;
344        }
345}
346
347void BaseRTMPProtocol::GetStats(Variant &info, uint32_t namespaceId) {
348        BaseProtocol::GetStats(info, namespaceId);
349        info["rxInvokes"] = _rxInvokes;
350        info["txInvokes"] = _txInvokes;
351        for (uint32_t i = 0; i < MAX_STREAMS_COUNT; i++) {
352                if (_streams[i] != NULL) {
353                        Variant si;
354                        _streams[i]->GetStats(si, namespaceId);
355                        info["streams"].PushToArray(si);
356                }
357        }
358
359        FOR_MAP(_inFileStreams, InFileRTMPStream *, InFileRTMPStream *, i) {
360                Variant si;
361                MAP_VAL(i)->GetStats(si, namespaceId);
362                info["streams"].PushToArray(si);
363        }
364}
365
366bool BaseRTMPProtocol::ResetChannel(uint32_t channelId) {
367        if (channelId >= MAX_CHANNELS_COUNT) {
368                FATAL("Invalid channel id in reset message: %"PRIu32, channelId);
369                return false;
370        }
371        _channels[channelId].Reset();
372        return true;
373}
374
375bool BaseRTMPProtocol::SendMessage(Variant & message) {
376#ifdef ENFORCE_RTMP_OUTPUT_CHECKS
377        _intermediateBuffer.IgnoreAll();
378        if (!_rtmpProtocolSerializer.Serialize(_channels[(uint32_t) VH_CI(message)],
379                        message, _intermediateBuffer, _outboundChunkSize)) {
380                FATAL("Unable to serialize RTMP message");
381                return false;
382        }
383        if (!_pMonitor->Feed(_intermediateBuffer)) {
384                ASSERT("Server sent invalid data");
385        }
386        _outputBuffer.ReadFromBuffer(
387                        GETIBPOINTER(_intermediateBuffer),
388                        GETAVAILABLEBYTESCOUNT(_intermediateBuffer));
389#else  /* ENFORCE_RTMP_OUTPUT_CHECKS */
390        //2. Send the message
391        if (!_rtmpProtocolSerializer.Serialize(_channels[(uint32_t) VH_CI(message)],
392                        message, _outputBuffer, _outboundChunkSize)) {
393                FATAL("Unable to serialize RTMP message");
394                return false;
395        }
396#endif  /* ENFORCE_RTMP_OUTPUT_CHECKS */
397
398        _txInvokes++;
399
400        //3. Mark the connection as ready for outbound transfer
401        return EnqueueForOutbound();
402}
403
404bool BaseRTMPProtocol::SendRawData(Header &header, Channel &channel, uint8_t *pData,
405                uint32_t length) {
406#ifdef ENFORCE_RTMP_OUTPUT_CHECKS
407        _intermediateBuffer.IgnoreAll();
408        if (!header.Write(channel, _intermediateBuffer)) {
409                FATAL("Unable to serialize message header");
410                return false;
411        }
412
413        _intermediateBuffer.ReadFromBuffer(pData, length);
414        if (!_pMonitor->Feed(_intermediateBuffer)) {
415                ASSERT("Server sent invalid data");
416        }
417        _outputBuffer.ReadFromBuffer(
418                        GETIBPOINTER(_intermediateBuffer),
419                        GETAVAILABLEBYTESCOUNT(_intermediateBuffer));
420#else /* ENFORCE_RTMP_OUTPUT_CHECKS */
421        if (!header.Write(channel, _outputBuffer)) {
422                FATAL("Unable to serialize message header");
423                return false;
424        }
425
426        _outputBuffer.ReadFromBuffer(pData, length);
427#endif /* ENFORCE_RTMP_OUTPUT_CHECKS */
428        return EnqueueForOutbound();
429}
430
431bool BaseRTMPProtocol::SendRawData(uint8_t *pData, uint32_t length) {
432#ifdef ENFORCE_RTMP_OUTPUT_CHECKS
433        _intermediateBuffer.IgnoreAll();
434        _intermediateBuffer.ReadFromBuffer(pData, length);
435        if (!_pMonitor->Feed(_intermediateBuffer)) {
436                ASSERT("Server sent invalid data");
437        }
438        _outputBuffer.ReadFromBuffer(
439                        GETIBPOINTER(_intermediateBuffer),
440                        GETAVAILABLEBYTESCOUNT(_intermediateBuffer));
441#else /* ENFORCE_RTMP_OUTPUT_CHECKS */
442        _outputBuffer.ReadFromBuffer(pData, length);
443#endif /* ENFORCE_RTMP_OUTPUT_CHECKS */
444        return EnqueueForOutbound();
445}
446
447void BaseRTMPProtocol::SetWinAckSize(uint32_t winAckSize) {
448        _nextReceivedBytesCountReport -= _winAckSize;
449        _winAckSize = winAckSize;
450        _nextReceivedBytesCountReport += _winAckSize;
451}
452
453uint32_t BaseRTMPProtocol::GetOutboundChunkSize() {
454        return _outboundChunkSize;
455}
456
457uint32_t BaseRTMPProtocol::GetInboundChunkSize() {
458        return _inboundChunkSize;
459}
460
461bool BaseRTMPProtocol::SetInboundChunkSize(uint32_t chunkSize) {
462        /*WARN("Chunk size changed for RTMP connection %p: %u->%u", this,
463                        _inboundChunkSize, chunkSize);*/
464        _inboundChunkSize = chunkSize;
465        for (uint32_t i = 0; i < MAX_STREAMS_COUNT; i++) {
466                if (_streams[i] != NULL) {
467                        if (TAG_KIND_OF(_streams[i]->GetType(), ST_IN_NET_RTMP))
468                                ((InNetRTMPStream *) _streams[i])->SetChunkSize(_inboundChunkSize);
469                }
470        }
471        return true;
472}
473
474void BaseRTMPProtocol::TrySetOutboundChunkSize(uint32_t chunkSize) {
475        if (_outboundChunkSize >= chunkSize) {
476                return;
477        }
478        _outboundChunkSize = chunkSize;
479        Variant chunkSizeMessage = GenericMessageFactory::GetChunkSize(_outboundChunkSize);
480        SendMessage(chunkSizeMessage);
481        for (uint32_t i = 0; i < MAX_STREAMS_COUNT; i++) {
482                if (_streams[i] != NULL) {
483                        if (TAG_KIND_OF(_streams[i]->GetType(), ST_OUT_NET_RTMP))
484                                ((BaseOutNetRTMPStream *) _streams[i])->SetChunkSize(_outboundChunkSize);
485                }
486        }
487}
488
489BaseStream * BaseRTMPProtocol::GetRTMPStream(uint32_t rtmpStreamId) {
490        if (rtmpStreamId == 0 || rtmpStreamId >= MAX_STREAMS_COUNT) {
491                FATAL("Invalid stream id: %u", rtmpStreamId);
492                return NULL;
493        }
494        return _streams[rtmpStreamId];
495}
496
497bool BaseRTMPProtocol::CloseStream(uint32_t streamId, bool createNeutralStream) {
498        //1. Validate request
499        if (streamId == 0 || streamId >= MAX_STREAMS_COUNT) {
500                FATAL("Invalid stream id: %u", streamId);
501                return false;
502        }
503
504        if (_streams[streamId] == NULL) {
505                WARN("Try to close a NULL stream");
506                return true;
507        }
508
509        if (TAG_KIND_OF(_streams[streamId]->GetType(), ST_OUT_NET_RTMP)) {
510                //2. Remove it from signaled streams
511                LinkedListNode<BaseOutNetRTMPStream *> *pTemp = _pSignaledRTMPOutNetStream;
512                while (pTemp != NULL) {
513                        if (pTemp->info->GetRTMPStreamId() == streamId) {
514                                _pSignaledRTMPOutNetStream =
515                                                RemoveLinkedList<BaseOutNetRTMPStream *>(pTemp);
516                                break;
517                        }
518                        pTemp = pTemp->pPrev;
519                }
520
521                //3. If this is an outbound network stream and his publisher
522                //is a file, close that as well
523                BaseOutNetRTMPStream *pBaseOutNetRTMPStream = (BaseOutNetRTMPStream *) _streams[streamId];
524                if (pBaseOutNetRTMPStream->GetInStream() != NULL) {
525                        if (TAG_KIND_OF(pBaseOutNetRTMPStream->GetInStream()->GetType(), ST_IN_FILE_RTMP))
526                                RemoveIFS((InFileRTMPStream *) pBaseOutNetRTMPStream->GetInStream());
527                }
528        }
529
530        //4. Delete the stream and replace it with a neutral one
531        delete _streams[streamId];
532        _streams[streamId] = NULL;
533        if ((createNeutralStream) && (GetApplication() != NULL)) {
534                _streams[streamId] = new RTMPStream(this,
535                                GetApplication()->GetStreamsManager(), streamId);
536        }
537
538        return true;
539}
540
541RTMPStream * BaseRTMPProtocol::CreateNeutralStream(uint32_t & streamId) {
542        if (streamId == 0) {
543                //Automatic allocation
544                for (uint32_t i = 1; i < MAX_STREAMS_COUNT; i++) {
545                        if (_streams[i] == NULL) {
546                                streamId = i;
547                                break;
548                        }
549                }
550
551                if (streamId == 0) {
552                        return NULL;
553                }
554        } else {
555                if (streamId == 0 || streamId >= MAX_STREAMS_COUNT) {
556                        FATAL("Invalid stream id: %u", streamId);
557                        return NULL;
558                }
559                if (_streams[streamId] != NULL) {
560                        FATAL("Try to create a neutral stream on a non NULL placeholder");
561                        return NULL;
562                }
563        }
564
565        RTMPStream *pStream = new RTMPStream(this,
566                        GetApplication()->GetStreamsManager(), streamId);
567        _streams[streamId] = pStream;
568
569        return pStream;
570}
571
572InNetRTMPStream * BaseRTMPProtocol::CreateINS(uint32_t channelId,
573                uint32_t streamId, string streamName) {
574        if (streamId == 0 || streamId >= MAX_STREAMS_COUNT) {
575                FATAL("Invalid stream id: %u", streamId);
576                return NULL;
577        }
578
579        if (_streams[streamId] == NULL) {
580                FATAL("Try to publish a stream on a NULL placeholder");
581                return NULL;
582        }
583
584        if (_streams[streamId]->GetType() != ST_NEUTRAL_RTMP) {
585                FATAL("Try to publish a stream over a non neutral stream");
586                return NULL;
587        }
588
589        delete _streams[streamId];
590        _streams[streamId] = NULL;
591
592        InNetRTMPStream *pStream = _pProtocolHandler->CreateInNetStream(this,
593                        channelId, streamId, streamName);
594        _streams[streamId] = pStream;
595
596        return pStream;
597}
598
599BaseOutNetRTMPStream * BaseRTMPProtocol::CreateONS(uint32_t streamId,
600                string streamName, uint64_t inStreamType) {
601        if (streamId == 0 || streamId >= MAX_STREAMS_COUNT) {
602                FATAL("Invalid stream id: %u", streamId);
603                return NULL;
604        }
605
606        if (_streams[streamId] == NULL) {
607                WARN("Try to play a stream on a NULL placeholder");
608        } else {
609
610                if (_streams[streamId]->GetType() != ST_NEUTRAL_RTMP) {
611                        FATAL("Try to play a stream over a non neutral stream: id: %u; type: %"PRIu64,
612                                        streamId, _streams[streamId]->GetType());
613                        return NULL;
614                }
615
616                delete _streams[streamId];
617                _streams[streamId] = NULL;
618        }
619
620        BaseOutNetRTMPStream *pBaseOutNetRTMPStream = BaseOutNetRTMPStream::GetInstance(
621                        this, GetApplication()->GetStreamsManager(), streamName, streamId,
622                        _outboundChunkSize, inStreamType);
623
624        if (pBaseOutNetRTMPStream == NULL) {
625                FATAL("Unable to create stream");
626                return NULL;
627        }
628
629        _streams[streamId] = pBaseOutNetRTMPStream;
630
631        return pBaseOutNetRTMPStream;
632}
633
634void BaseRTMPProtocol::SignalONS(BaseOutNetRTMPStream *pONS) {
635        LinkedListNode<BaseOutNetRTMPStream *> *pTemp = _pSignaledRTMPOutNetStream;
636        while (pTemp != NULL) {
637                if (pTemp->info == pONS) {
638                        return;
639                }
640                pTemp = pTemp->pPrev;
641        }
642        _pSignaledRTMPOutNetStream = AddLinkedList<BaseOutNetRTMPStream *>(
643                        _pSignaledRTMPOutNetStream, pONS, true);
644}
645
646InFileRTMPStream * BaseRTMPProtocol::CreateIFS(Variant &metadata) {
647        InFileRTMPStream *pRTMPInFileStream = InFileRTMPStream::GetInstance(
648                        this, GetApplication()->GetStreamsManager(), metadata);
649        if (pRTMPInFileStream == NULL) {
650                FATAL("Unable to get file stream. Metadata:\n%s", STR(metadata.ToString()));
651                return NULL;
652        }
653        bool hasTimer = true;
654        if (metadata.HasKeyChain(V_BOOL, true, 1, "hasTimer"))
655                hasTimer = (bool)metadata["hasTimer"];
656        if (!pRTMPInFileStream->Initialize(
657                        (int32_t) metadata[CONF_APPLICATION_CLIENTSIDEBUFFER], hasTimer)) {
658                FATAL("Unable to initialize file inbound stream");
659                delete pRTMPInFileStream;
660                return NULL;
661        }
662        _inFileStreams[pRTMPInFileStream] = pRTMPInFileStream;
663        return pRTMPInFileStream;
664}
665
666void BaseRTMPProtocol::RemoveIFS(InFileRTMPStream *pIFS) {
667        _inFileStreams.erase(pIFS);
668        delete pIFS;
669}
670
671Channel *BaseRTMPProtocol::ReserveChannel() {
672        if (_channelsPool.size() == 0)
673                return 0;
674
675        uint32_t result = 0;
676        result = _channelsPool[0];
677        _channelsPool.erase(_channelsPool.begin());
678
679        return &_channels[result];
680}
681
682void BaseRTMPProtocol::ReleaseChannel(Channel *pChannel) {
683        if (pChannel == NULL)
684                return;
685        if (pChannel->id <= 63)
686                ADD_VECTOR_BEGIN(_channelsPool, pChannel->id);
687        else
688                ADD_VECTOR_END(_channelsPool, pChannel->id);
689}
690
691bool BaseRTMPProtocol::EnqueueForTimeEvent(uint32_t seconds) {
692        ASSERT("Operation not supported. Please use a timer protocol");
693        return false;
694}
695
696uint32_t BaseRTMPProtocol::GetDHOffset(uint8_t *pBuffer, uint8_t schemeNumber) {
697        switch (schemeNumber) {
698                case 0:
699                {
700                        return GetDHOffset0(pBuffer);
701                }
702                case 1:
703                {
704                        return GetDHOffset1(pBuffer);
705                }
706                default:
707                {
708                        WARN("Invalid scheme number: %hhu. Defaulting to 0", schemeNumber);
709                        return GetDHOffset0(pBuffer);
710                }
711        }
712}
713
714uint32_t BaseRTMPProtocol::GetDigestOffset(uint8_t *pBuffer, uint8_t schemeNumber) {
715        switch (schemeNumber) {
716                case 0:
717                {
718                        return GetDigestOffset0(pBuffer);
719                }
720                case 1:
721                {
722                        return GetDigestOffset1(pBuffer);
723                }
724                default:
725                {
726                        WARN("Invalid scheme number: %hhu. Defaulting to 0", schemeNumber);
727                        return GetDigestOffset0(pBuffer);
728                }
729        }
730}
731
732uint32_t BaseRTMPProtocol::GetDHOffset0(uint8_t *pBuffer) {
733        uint32_t offset = pBuffer[1532] + pBuffer[1533] + pBuffer[1534] + pBuffer[1535];
734        offset = offset % 632;
735        offset = offset + 772;
736        if (offset + 128 >= 1536) {
737                ASSERT("Invalid DH offset");
738        }
739        return offset;
740}
741
742uint32_t BaseRTMPProtocol::GetDHOffset1(uint8_t *pBuffer) {
743        uint32_t offset = pBuffer[768] + pBuffer[769] + pBuffer[770] + pBuffer[771];
744        offset = offset % 632;
745        offset = offset + 8;
746        if (offset + 128 >= 1536) {
747                ASSERT("Invalid DH offset");
748        }
749        return offset;
750}
751
752uint32_t BaseRTMPProtocol::GetDigestOffset0(uint8_t *pBuffer) {
753        uint32_t offset = pBuffer[8] + pBuffer[9] + pBuffer[10] + pBuffer[11];
754        offset = offset % 728;
755        offset = offset + 12;
756        if (offset + 32 >= 1536) {
757                ASSERT("Invalid digest offset");
758        }
759        return offset;
760}
761
762uint32_t BaseRTMPProtocol::GetDigestOffset1(uint8_t *pBuffer) {
763        uint32_t offset = pBuffer[772] + pBuffer[773] + pBuffer[774] + pBuffer[775];
764        offset = offset % 728;
765        offset = offset + 776;
766        if (offset + 32 >= 1536) {
767                ASSERT("Invalid digest offset");
768        }
769        return offset;
770}
771
772bool BaseRTMPProtocol::ProcessBytes(IOBuffer &buffer) {
773        while (true) {
774                uint32_t availableBytesCount = GETAVAILABLEBYTESCOUNT(buffer);
775                if (_selectedChannel < 0) {
776                        if (availableBytesCount < 1) {
777                                return true;
778                        } else {
779                                switch (GETIBPOINTER(buffer)[0]&0x3f) {
780                                        case 0:
781                                        {
782                                                if (availableBytesCount < 2) {
783                                                        FINEST("Not enough data");
784                                                        return true;
785                                                }
786                                                _selectedChannel = 64 + GETIBPOINTER(buffer)[1];
787                                                _channels[_selectedChannel].lastInHeaderType = GETIBPOINTER(buffer)[0] >> 6;
788                                                buffer.Ignore(2);
789                                                availableBytesCount -= 2;
790                                                break;
791                                        }
792                                        case 1:
793                                        {
794                                                //                                              if (availableBytesCount < 3) {
795                                                //                                                      FINEST("Not enough data");
796                                                //                                                      return true;
797                                                //                                              }
798                                                //                                              _selectedChannel = GETIBPOINTER(buffer)[2]*256 + GETIBPOINTER(buffer)[1] + 64;
799                                                //                                              _channels[_selectedChannel].lastInHeaderType = GETIBPOINTER(buffer)[0] >> 6;
800                                                //                                              buffer.Ignore(3);
801                                                //                                              availableBytesCount -= 3;
802                                                //                                              break;
803                                                FATAL("The server doesn't support channel ids bigger than 319");
804                                                return false;
805                                        };
806                                        default:
807                                        {
808                                                _selectedChannel = GETIBPOINTER(buffer)[0]&0x3f;
809                                                _channels[_selectedChannel].lastInHeaderType = GETIBPOINTER(buffer)[0] >> 6;
810                                                buffer.Ignore(1);
811                                                availableBytesCount -= 1;
812                                                break;
813                                        }
814                                }
815                        }
816                }
817
818                if (_selectedChannel >= MAX_CHANNELS_COUNT) {
819                        FATAL("Bogus connection. Drop it like is hot");
820                        return false;
821                }
822
823                Channel &channel = _channels[_selectedChannel];
824                Header &header = channel.lastInHeader;
825
826                if (channel.state == CS_HEADER) {
827                        if (!header.Read(_selectedChannel, channel.lastInHeaderType,
828                                        buffer, availableBytesCount)) {
829                                FATAL("Unable to read header");
830                                return false;
831                        } else {
832                                if (!header.readCompleted)
833                                        return true;
834                                channel.state = CS_PAYLOAD;
835                                switch (channel.lastInHeaderType) {
836                                        case HT_FULL:
837                                        {
838                                                channel.lastInAbsTs = H_TS(header);
839                                                break;
840                                        }
841                                        case HT_SAME_STREAM:
842                                        case HT_SAME_LENGTH_AND_STREAM:
843                                        {
844                                                channel.lastInAbsTs += H_TS(header);
845                                                break;
846                                        }
847                                        case HT_CONTINUATION:
848                                        {
849                                                if (channel.lastInProcBytes == 0) {
850                                                        channel.lastInAbsTs += H_TS(header);
851                                                }
852                                                break;
853                                        }
854                                }
855                        }
856                }
857
858                if (channel.state == CS_PAYLOAD) {
859                        uint32_t tempSize = H_ML(header) - channel.lastInProcBytes;
860                        tempSize = (tempSize >= _inboundChunkSize) ? _inboundChunkSize : tempSize;
861                        uint32_t availableBytes = GETAVAILABLEBYTESCOUNT(buffer);
862                        if (tempSize > availableBytes)
863                                return true;
864                        channel.state = CS_HEADER;
865                        _selectedChannel = -1;
866                        switch (H_MT(header)) {
867                                case RM_HEADER_MESSAGETYPE_VIDEODATA:
868                                {
869                                        if (H_SI(header) >= MAX_STREAMS_COUNT) {
870                                                FATAL("The server doesn't support stream ids bigger than %"PRIu32,
871                                                                (uint32_t) MAX_STREAMS_COUNT);
872                                                return false;
873                                        }
874                                        if ((_streams[H_SI(header)] != NULL)
875                                                        && (_streams[H_SI(header)]->GetType() == ST_IN_NET_RTMP)) {
876                                                if (!((InNetRTMPStream *) _streams[H_SI(header)])->FeedData(
877                                                                GETIBPOINTER(buffer), //pData,
878                                                                tempSize, //dataLength,
879                                                                channel.lastInProcBytes, //processedLength,
880                                                                H_ML(header), //totalLength,
881                                                                channel.lastInAbsTs, //absoluteTimestamp,
882                                                                false //isAudio
883                                                                )) {
884                                                        FATAL("Unable to feed video");
885                                                        return false;
886                                                }
887                                        }
888
889                                        channel.lastInProcBytes += tempSize;
890                                        if (H_ML(header) == channel.lastInProcBytes) {
891                                                channel.lastInProcBytes = 0;
892                                        }
893                                        if (!buffer.Ignore(tempSize)) {
894                                                FATAL("V: Unable to ignore %u bytes", tempSize);
895                                                return false;
896                                        }
897                                        break;
898                                }
899                                case RM_HEADER_MESSAGETYPE_AUDIODATA:
900                                {
901                                        if (H_SI(header) >= MAX_STREAMS_COUNT) {
902                                                FATAL("The server doesn't support stream ids bigger than %"PRIu32,
903                                                                (uint32_t) MAX_STREAMS_COUNT);
904                                                return false;
905                                        }
906                                        if ((_streams[H_SI(header)] != NULL)
907                                                        && (_streams[H_SI(header)]->GetType() == ST_IN_NET_RTMP)) {
908                                                if (!((InNetRTMPStream *) _streams[H_SI(header)])->FeedData(
909                                                                GETIBPOINTER(buffer), //pData,
910                                                                tempSize, //dataLength,
911                                                                channel.lastInProcBytes, //processedLength,
912                                                                H_ML(header), //totalLength,
913                                                                channel.lastInAbsTs, //absoluteTimestamp,
914                                                                true //isAudio
915                                                                )) {
916                                                        FATAL("Unable to feed audio");
917                                                        return false;
918                                                }
919                                        }
920
921
922                                        channel.lastInProcBytes += tempSize;
923                                        if (H_ML(header) == channel.lastInProcBytes) {
924                                                channel.lastInProcBytes = 0;
925                                        }
926                                        if (!buffer.Ignore(tempSize)) {
927                                                FATAL("A: Unable to ignore %u bytes", tempSize);
928                                                return false;
929                                        }
930                                        break;
931                                }
932                                default:
933                                {
934                                        channel.inputData.ReadFromInputBuffer(buffer, tempSize);
935                                        channel.lastInProcBytes += tempSize;
936                                        if (!buffer.Ignore(tempSize)) {
937                                                FATAL("Unable to ignore %u bytes", tempSize);
938                                                return false;
939                                        }
940                                        if (H_ML(header) == channel.lastInProcBytes) {
941                                                channel.lastInProcBytes = 0;
942                                                if (_pProtocolHandler == NULL) {
943                                                        FATAL("RTMP connection no longer associated with an application");
944                                                        return false;
945                                                }
946                                                if (!_pProtocolHandler->InboundMessageAvailable(this, header, channel.inputData)) {
947                                                        FATAL("Unable to send rtmp message to application");
948                                                        return false;
949                                                }
950                                                _rxInvokes++;
951
952                                                if (GETAVAILABLEBYTESCOUNT(channel.inputData) != 0) {
953                                                        FATAL("Invalid message!!! We have leftovers: %u bytes",
954                                                                        GETAVAILABLEBYTESCOUNT(channel.inputData));
955                                                        return false;
956                                                }
957                                        }
958                                        break;
959                                }
960                        }
961                }
962        }
963}
964#endif /* HAS_PROTOCOL_RTMP */
Note: See TracBrowser for help on using the repository browser.