source: trunk/sources/thelib/src/protocols/rtmp/monitorrtmpprotocol.cpp @ 722

Revision 722, 8.5 KB checked in by shiretu, 4 months ago (diff)

-- replaced all assert() calls with o_assert wrapper
-- small formatting here and there

Line 
1/*
2 *  Copyright (c) 2010,
3 *  Gavriloaie Eugen-Andrei (shiretu@gmail.com)
4 *
5 *  This file is part of crtmpserver.
6 *  crtmpserver is free software: you can redistribute it and/or modify
7 *  it under the terms of the GNU General Public License as published by
8 *  the Free Software Foundation, either version 3 of the License, or
9 *  (at your option) any later version.
10 *
11 *  crtmpserver is distributed in the hope that it will be useful,
12 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 *  GNU General Public License for more details.
15 *
16 *  You should have received a copy of the GNU General Public License
17 *  along with crtmpserver.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20
21#ifdef HAS_PROTOCOL_RTMP
22#include "application/clientapplicationmanager.h"
23#include "netio/netio.h"
24#include "protocols/protocolmanager.h"
25#include "protocols/rtmp/monitorrtmpprotocol.h"
26#include "protocols/rtmp/basertmpappprotocolhandler.h"
27#include "protocols/rtmp/messagefactories/messagefactories.h"
28#include "protocols/rtmp/streaming/baseoutnetrtmpstream.h"
29#include "streaming/streamstypes.h"
30#include "protocols/rtmp/streaming/innetrtmpstream.h"
31#include "protocols/rtmp/streaming/infilertmpstream.h"
32#include "protocols/rtmp/streaming/rtmpstream.h"
33#include "streaming/streamstypes.h"
34
35MonitorRTMPProtocol::MonitorRTMPProtocol(uint32_t maxStreamCount,
36                uint32_t maxChannelsCount)
37: BaseProtocol(PT_INBOUND_RTMP) {
38        _maxChannelsCount = maxChannelsCount;
39        _channels = NULL;
40        _channels = new Channel[maxChannelsCount];
41        for (uint32_t i = 0; i < _maxChannelsCount; i++) {
42                memset(&_channels[i], 0, sizeof (Channel));
43                _channels[i].id = i;
44                _channels[i].lastOutStreamId = 0xffffffff;
45        }
46        _selectedChannel = -1;
47        _inboundChunkSize = 128;
48        _maxStreamCount = maxStreamCount;
49}
50
51MonitorRTMPProtocol::~MonitorRTMPProtocol() {
52        if (_channels != NULL) {
53                delete[] _channels;
54                _channels = NULL;
55        }
56}
57
58bool MonitorRTMPProtocol::Initialize(Variant &parameters) {
59        GetCustomParameters() = parameters;
60        return true;
61}
62
63bool MonitorRTMPProtocol::AllowFarProtocol(uint64_t type) {
64        return false;
65}
66
67bool MonitorRTMPProtocol::AllowNearProtocol(uint64_t type) {
68        return false;
69}
70
71bool MonitorRTMPProtocol::SignalInputData(int32_t recvAmount) {
72        ASSERT("OPERATION NOT SUPPORTED");
73        return false;
74}
75
76bool MonitorRTMPProtocol::SignalInputData(IOBuffer &buffer) {
77        return ProcessBytes(buffer);
78}
79
80bool MonitorRTMPProtocol::SetInboundChunkSize(uint32_t chunkSize) {
81        _inboundChunkSize = chunkSize;
82        return true;
83}
84
85bool MonitorRTMPProtocol::Feed(IOBuffer &buffer) {
86        _input.ReadFromBuffer(GETIBPOINTER(buffer), GETAVAILABLEBYTESCOUNT(buffer));
87        return SignalInputData(_input);
88}
89
90bool MonitorRTMPProtocol::ProcessBytes(IOBuffer &buffer) {
91        while (true) {
92                uint32_t availableBytesCount = GETAVAILABLEBYTESCOUNT(buffer);
93                if (_selectedChannel < 0) {
94                        if (availableBytesCount < 1) {
95                                return true;
96                        } else {
97                                switch (GETIBPOINTER(buffer)[0]&0x3f) {
98                                        case 0:
99                                        {
100                                                if (availableBytesCount < 2) {
101                                                        FINEST("Not enough data");
102                                                        return true;
103                                                }
104                                                _selectedChannel = 64 + GETIBPOINTER(buffer)[1];
105                                                _channels[_selectedChannel].lastInHeaderType = GETIBPOINTER(buffer)[0] >> 6;
106                                                buffer.Ignore(2);
107                                                availableBytesCount -= 2;
108                                                break;
109                                        }
110                                        case 1:
111                                        {
112                                                //                                              if (availableBytesCount < 3) {
113                                                //                                                      FINEST("Not enough data");
114                                                //                                                      return true;
115                                                //                                              }
116                                                //                                              _selectedChannel = GETIBPOINTER(buffer)[2]*256 + GETIBPOINTER(buffer)[1] + 64;
117                                                //                                              _channels[_selectedChannel].lastInHeaderType = GETIBPOINTER(buffer)[0] >> 6;
118                                                //                                              buffer.Ignore(3);
119                                                //                                              availableBytesCount -= 3;
120                                                //                                              break;
121                                                FATAL("The server doesn't support channel ids bigger than 319");
122                                                return false;
123                                        };
124                                        default:
125                                        {
126                                                _selectedChannel = GETIBPOINTER(buffer)[0]&0x3f;
127                                                _channels[_selectedChannel].lastInHeaderType = GETIBPOINTER(buffer)[0] >> 6;
128                                                buffer.Ignore(1);
129                                                availableBytesCount -= 1;
130                                                break;
131                                        }
132                                }
133                        }
134                }
135
136                Channel &channel = _channels[_selectedChannel];
137                Header &header = channel.lastInHeader;
138                FINEST("header: %s", STR(header));
139
140                if (channel.state == CS_HEADER) {
141                        if (!header.Read(_selectedChannel, channel.lastInHeaderType,
142                                        buffer, availableBytesCount)) {
143                                FATAL("Unable to read header");
144                                return false;
145                        } else {
146                                if (!header.readCompleted)
147                                        return true;
148
149                                if (H_SI(header) >= _maxStreamCount) {
150                                        FATAL("%s", STR(header));
151                                        FATAL("buffer:\n%s", STR(buffer));
152                                        ASSERT("invalid stream index");
153                                }
154
155                                if (H_CI(header) >= _maxChannelsCount) {
156                                        FATAL("%s", STR(header));
157                                        FATAL("buffer:\n%s", STR(buffer));
158                                        ASSERT("invalid channel index");
159                                }
160
161                                switch ((uint8_t) H_MT(header)) {
162                                        case RM_HEADER_MESSAGETYPE_ABORTMESSAGE:
163                                        case RM_HEADER_MESSAGETYPE_ACK:
164                                        case RM_HEADER_MESSAGETYPE_AGGREGATE:
165                                        case RM_HEADER_MESSAGETYPE_AUDIODATA:
166                                        case RM_HEADER_MESSAGETYPE_CHUNKSIZE:
167                                        case RM_HEADER_MESSAGETYPE_FLEX:
168                                        case RM_HEADER_MESSAGETYPE_FLEXSHAREDOBJECT:
169                                        case RM_HEADER_MESSAGETYPE_FLEXSTREAMSEND:
170                                        case RM_HEADER_MESSAGETYPE_INVOKE:
171                                        case RM_HEADER_MESSAGETYPE_NOTIFY:
172                                        case RM_HEADER_MESSAGETYPE_PEERBW:
173                                        case RM_HEADER_MESSAGETYPE_SHAREDOBJECT:
174                                        case RM_HEADER_MESSAGETYPE_USRCTRL:
175                                        case RM_HEADER_MESSAGETYPE_VIDEODATA:
176                                        case RM_HEADER_MESSAGETYPE_WINACKSIZE:
177                                        {
178                                                break;
179                                        }
180                                        default:
181                                        {
182                                                FATAL("%s", STR(header));
183                                                FATAL("buffer:\n%s", STR(buffer));
184                                                ASSERT("invalid message type");
185                                        }
186                                }
187                                channel.state = CS_PAYLOAD;
188                                switch (channel.lastInHeaderType) {
189                                        case HT_FULL:
190                                        {
191                                                channel.lastInAbsTs = H_TS(header);
192                                                break;
193                                        }
194                                        case HT_SAME_STREAM:
195                                        case HT_SAME_LENGTH_AND_STREAM:
196                                        {
197                                                channel.lastInAbsTs += H_TS(header);
198                                                break;
199                                        }
200                                        case HT_CONTINUATION:
201                                        {
202                                                if (channel.lastInProcBytes == 0) {
203                                                        channel.lastInAbsTs += H_TS(header);
204                                                }
205                                                break;
206                                        }
207                                }
208                        }
209                }
210
211                if (channel.state == CS_PAYLOAD) {
212                        uint32_t tempSize = H_ML(header) - channel.lastInProcBytes;
213                        tempSize = (tempSize >= _inboundChunkSize) ? _inboundChunkSize : tempSize;
214                        uint32_t availableBytes = GETAVAILABLEBYTESCOUNT(buffer);
215                        if (tempSize > availableBytes)
216                                return true;
217                        channel.state = CS_HEADER;
218                        _selectedChannel = -1;
219                        switch (H_MT(header)) {
220                                case RM_HEADER_MESSAGETYPE_VIDEODATA:
221                                {
222                                        if (H_SI(header) >= _maxStreamCount) {
223                                                FATAL("Incorrect stream index");
224                                                return false;
225                                        }
226
227                                        //FINEST("Video data");
228
229                                        channel.lastInProcBytes += tempSize;
230                                        if (H_ML(header) == channel.lastInProcBytes) {
231                                                channel.lastInProcBytes = 0;
232                                        }
233                                        if (!buffer.Ignore(tempSize)) {
234                                                FATAL("V: Unable to ignore %u bytes", tempSize);
235                                                return false;
236                                        }
237                                        break;
238                                }
239                                case RM_HEADER_MESSAGETYPE_AUDIODATA:
240                                {
241                                        if (H_SI(header) >= _maxStreamCount) {
242                                                FATAL("Incorrect stream index");
243                                                return false;
244                                        }
245
246                                        //FINEST("Audio data");
247
248                                        channel.lastInProcBytes += tempSize;
249                                        if (H_ML(header) == channel.lastInProcBytes) {
250                                                channel.lastInProcBytes = 0;
251                                        }
252                                        if (!buffer.Ignore(tempSize)) {
253                                                FATAL("A: Unable to ignore %u bytes", tempSize);
254                                                return false;
255                                        }
256                                        break;
257                                }
258                                default:
259                                {
260                                        channel.inputData.ReadFromInputBuffer(buffer, tempSize);
261                                        channel.lastInProcBytes += tempSize;
262                                        if (!buffer.Ignore(tempSize)) {
263                                                FATAL("Unable to ignore %u bytes", tempSize);
264                                                return false;
265                                        }
266                                        if (H_ML(header) == channel.lastInProcBytes) {
267                                                channel.lastInProcBytes = 0;
268                                                Variant msg;
269                                                if (!_rtmpProtocolSerializer.Deserialize(header, channel.inputData, msg)) {
270                                                        FATAL("Unable to deserialize message");
271                                                        return false;
272                                                }
273
274                                                if ((uint8_t) VH_MT(msg) == RM_HEADER_MESSAGETYPE_CHUNKSIZE) {
275                                                        _inboundChunkSize = (uint32_t) msg[RM_CHUNKSIZE];
276                                                }
277
278                                                if ((uint8_t) VH_MT(msg) == RM_HEADER_MESSAGETYPE_ABORTMESSAGE) {
279                                                        uint32_t channelId = (uint32_t) msg[RM_ABORTMESSAGE];
280                                                        if (channelId >= _maxChannelsCount) {
281                                                                FATAL("Invalid channel id in reset message: %"PRIu32, channelId);
282                                                                return false;
283                                                        }
284                                                        o_assert(_channels[channelId].id == channelId);
285                                                        _channels[channelId].Reset();
286                                                }
287
288                                                if (GETAVAILABLEBYTESCOUNT(channel.inputData) != 0) {
289                                                        FATAL("Invalid message!!! We have leftovers: %u bytes",
290                                                                        GETAVAILABLEBYTESCOUNT(channel.inputData));
291                                                        return false;
292                                                }
293                                        }
294                                        break;
295                                }
296                        }
297                }
298        }
299}
300#endif /* HAS_PROTOCOL_RTMP */
301
Note: See TracBrowser for help on using the repository browser.