source: trunk/sources/thelib/src/protocols/rtmp/basertmpappprotocolhandler.cpp @ 765

Revision 765, 76.3 KB checked in by shiretu, 13 days ago (diff)

-- fixes from EvoStream?

Line 
1/*
2 *  Copyright (c) 2010,
3 *  Gavriloaie Eugen-Andrei (shiretu@gmail.com)
4 *
5 *  This file is part of crtmpserver.
6 *  crtmpserver is free software: you can redistribute it and/or modify
7 *  it under the terms of the GNU General Public License as published by
8 *  the Free Software Foundation, either version 3 of the License, or
9 *  (at your option) any later version.
10 *
11 *  crtmpserver is distributed in the hope that it will be useful,
12 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 *  GNU General Public License for more details.
15 *
16 *  You should have received a copy of the GNU General Public License
17 *  along with crtmpserver.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20#ifdef HAS_PROTOCOL_RTMP
21#include "application/baseclientapplication.h"
22#include "protocols/rtmp/basertmpappprotocolhandler.h"
23#include "protocols/rtmp/basertmpprotocol.h"
24#include "protocols/rtmp/outboundrtmpprotocol.h"
25#include "protocols/rtmp/messagefactories/messagefactories.h"
26#include "application/clientapplicationmanager.h"
27#include "protocols/ts/basetsappprotocolhandler.h"
28#include "protocols/rtmp/streaming/baseoutnetrtmpstream.h"
29#include "protocols/rtmp/streaming/infilertmpstream.h"
30#include "protocols/rtmp/streaming/innetrtmpstream.h"
31#include "protocols/rtmp/sharedobjects/so.h"
32#include "protocols/rtmp/streaming/outfilertmpflvstream.h"
33#include "streaming/streamstypes.h"
34#include "streaming/baseinstream.h"
35#include "streaming/baseinnetstream.h"
36
37#define ONBWCHECK_SIZE 32767
38
39BaseRTMPAppProtocolHandler::BaseRTMPAppProtocolHandler(Variant &configuration)
40: BaseAppProtocolHandler(configuration) {
41        _validateHandshake = (bool)configuration[CONF_APPLICATION_VALIDATEHANDSHAKE];
42        _keyframeSeek = (bool)configuration[CONF_APPLICATION_KEYFRAMESEEK];
43        _clientSideBuffer = (int32_t) configuration[CONF_APPLICATION_CLIENTSIDEBUFFER];
44        _seekGranularity = (uint32_t) ((double) configuration[CONF_APPLICATION_SEEKGRANULARITY]*1000);
45        _mediaFolder = (string) configuration[CONF_APPLICATION_MEDIAFOLDER];
46        _renameBadFiles = (bool)configuration[CONF_APPLICATION_RENAMEBADFILES];
47        _externSeekGenerator = (bool)configuration[CONF_APPLICATION_EXTERNSEEKGENERATOR];
48        _enableCheckBandwidth = false;
49        if (_configuration.HasKeyChain(V_BOOL, false, 1, "enableCheckBandwidth")) {
50                _enableCheckBandwidth = (bool)_configuration.GetValue(
51                                "enableCheckBandwidth", false);
52        }
53        if (_enableCheckBandwidth) {
54                Variant parameters;
55                parameters.PushToArray(Variant());
56                parameters.PushToArray(generateRandomString(ONBWCHECK_SIZE));
57                _onBWCheckMessage = GenericMessageFactory::GetInvoke(3, 0, 0, false, 0,
58                                RM_INVOKE_FUNCTION_ONBWCHECK, parameters);
59                _onBWCheckStrippedMessage[RM_INVOKE][RM_INVOKE_FUNCTION] = RM_INVOKE_FUNCTION_ONBWCHECK;
60        }
61        _lastUsersFileUpdate = 0;
62        if ((bool)configuration[CONF_APPLICATION_GENERATE_META_FILES]) {
63                GenerateMetaFiles();
64        }
65}
66
67bool BaseRTMPAppProtocolHandler::ParseAuthenticationNode(Variant &node,
68                Variant &result) {
69#ifndef HAS_LUA
70        ASSERT("Lua is not supported by the current build of the server. Adobe authentication needs lua support");
71        return false;
72#endif
73        //1. Validation
74        if ((!node.HasKeyChain(V_STRING, true, 1, CONF_APPLICATION_AUTH_TYPE))
75                        || (node[CONF_APPLICATION_AUTH_TYPE] != CONF_APPLICATION_AUTH_TYPE_ADOBE)) {
76                FATAL("Invalid authentication type");
77                return false;
78        }
79
80        if ((!node.HasKeyChain(V_MAP, true, 1, CONF_APPLICATION_AUTH_ENCODER_AGENTS))
81                        || (node[CONF_APPLICATION_AUTH_ENCODER_AGENTS].MapSize() == 0)) {
82                FATAL("Invalid encoder agents array");
83                return false;
84        }
85
86        if ((!node.HasKeyChain(V_STRING, true, 1, CONF_APPLICATION_AUTH_USERS_FILE))
87                        || (node[CONF_APPLICATION_AUTH_USERS_FILE] == "")) {
88                FATAL("Invalid users file path");
89                return false;
90        }
91
92        //2. Users file validation
93        string usersFile = node[CONF_APPLICATION_AUTH_USERS_FILE];
94        if ((usersFile[0] != '/') && (usersFile[0] != '.')) {
95                usersFile = (string) _configuration[CONF_APPLICATION_DIRECTORY] + usersFile;
96        }
97        if (!fileExists(usersFile)) {
98                FATAL("Invalid authentication configuration. Missing users file: %s", STR(usersFile));
99                return false;
100        }
101
102        //3. Build the result
103        result[CONF_APPLICATION_AUTH_TYPE] = CONF_APPLICATION_AUTH_TYPE_ADOBE;
104        result[CONF_APPLICATION_AUTH_USERS_FILE] = usersFile;
105
106        FOR_MAP(node[CONF_APPLICATION_AUTH_ENCODER_AGENTS], string, Variant, i) {
107                if ((MAP_VAL(i) != V_STRING) || (MAP_VAL(i) == "")) {
108                        FATAL("Invalid encoder agent encountered");
109                        return false;
110                }
111                result[CONF_APPLICATION_AUTH_ENCODER_AGENTS][(string) MAP_VAL(i)] = MAP_VAL(i);
112        }
113        result["adobeAuthSalt"] = _adobeAuthSalt = generateRandomString(32);
114        _adobeAuthSettings = result;
115        _authMethod = CONF_APPLICATION_AUTH_TYPE_ADOBE;
116
117        double modificationDate = getFileModificationDate(usersFile);
118        if (modificationDate == 0) {
119                FATAL("Unable to get last modification date for file %s", STR(usersFile));
120                return false;
121        }
122
123        if (modificationDate != _lastUsersFileUpdate) {
124                _users.Reset();
125                if (!ReadLuaFile(usersFile, "users", _users)) {
126                        FATAL("Unable to read users file: `%s`", STR(usersFile));
127                        return false;
128                }
129                _lastUsersFileUpdate = modificationDate;
130        }
131
132        return true;
133}
134
135BaseRTMPAppProtocolHandler::~BaseRTMPAppProtocolHandler() {
136
137        FOR_MAP(_connections, uint32_t, BaseRTMPProtocol *, i) {
138                MAP_VAL(i)->SetApplication(NULL);
139                MAP_VAL(i)->EnqueueForDelete();
140        }
141}
142
143bool BaseRTMPAppProtocolHandler::ValidateHandshake() {
144        return _validateHandshake;
145}
146
147SOManager *BaseRTMPAppProtocolHandler::GetSOManager() {
148        return &_soManager;
149}
150
151void BaseRTMPAppProtocolHandler::SignalClientSOConnected(BaseRTMPProtocol *pFrom,
152                ClientSO *pClientSO) {
153}
154
155void BaseRTMPAppProtocolHandler::SignalClientSOUpdated(BaseRTMPProtocol *pFrom,
156                ClientSO *pClientSO) {
157        //      FINEST("%s", STR(pClientSO->ToString()));
158        //      NYI;
159}
160
161void BaseRTMPAppProtocolHandler::SignalClientSOSend(BaseRTMPProtocol *pFrom,
162                ClientSO *pClientSO, Variant &parameters) {
163        //      FINEST("parameters:\n%s", STR(parameters.ToString()));
164        //      NYI;
165}
166
167void BaseRTMPAppProtocolHandler::RegisterProtocol(BaseProtocol *pProtocol) {
168        //      FINEST("Register protocol %s to application %s",
169        //                      STR(*pProtocol), STR(GetApplication()->GetName()));
170        if (MAP_HAS1(_connections, pProtocol->GetId()))
171                return;
172        _connections[pProtocol->GetId()] = (BaseRTMPProtocol *) pProtocol;
173        _nextInvokeId[pProtocol->GetId()] = 1;
174}
175
176void BaseRTMPAppProtocolHandler::UnRegisterProtocol(BaseProtocol *pProtocol) {
177        _soManager.UnRegisterProtocol((BaseRTMPProtocol*) pProtocol);
178        if (!MAP_HAS1(_connections, pProtocol->GetId())) {
179                return;
180        }
181        _connections.erase(pProtocol->GetId());
182        _nextInvokeId.erase(pProtocol->GetId());
183        _resultMessageTracking.erase(pProtocol->GetId());
184}
185
186bool BaseRTMPAppProtocolHandler::PullExternalStream(URI uri, Variant streamConfig) {
187        //1. normalize the stream name
188        string localStreamName = "";
189        if (streamConfig["localStreamName"] == V_STRING)
190                localStreamName = (string) streamConfig["localStreamName"];
191        trim(localStreamName);
192        if (localStreamName == "") {
193                streamConfig["localStreamName"] = "stream_" + generateRandomString(8);
194                WARN("No localstream name for external URI: %s. Defaulted to %s",
195                                STR(uri.fullUri()),
196                                STR(streamConfig["localStreamName"]));
197        }
198
199        //2. Prepare the custom parameters
200        Variant parameters;
201        parameters["customParameters"]["externalStreamConfig"] = streamConfig;
202        parameters[CONF_APPLICATION_NAME] = GetApplication()->GetName();
203        string scheme = uri.scheme();
204        if (scheme == "rtmp") {
205                parameters[CONF_PROTOCOL] = CONF_PROTOCOL_OUTBOUND_RTMP;
206        } else if (scheme == "rtmpt") {
207                parameters[CONF_PROTOCOL] = CONF_PROTOCOL_OUTBOUND_RTMPT;
208        } else if (scheme == "rtmpe") {
209                parameters[CONF_PROTOCOL] = CONF_PROTOCOL_OUTBOUND_RTMPE;
210        } else {
211                FATAL("scheme %s not supported by RTMP handler", STR(scheme));
212                return false;
213        }
214
215        //3. start the connecting sequence
216        return OutboundRTMPProtocol::Connect(uri.ip(), uri.port(), parameters);
217}
218
219bool BaseRTMPAppProtocolHandler::PullExternalStream(URI &uri,
220                BaseRTMPProtocol *pFrom, string &sourceName, string &destName) {
221        //1. Get the streams manager
222        StreamsManager *pStreamsManager = GetApplication()->GetStreamsManager();
223
224        //2. Search for all streams named streamName having the type of IN_NET
225        map<uint32_t, BaseStream *> streams = pStreamsManager->FindByTypeByName(
226                        ST_IN_NET, destName, true, true);
227        if (streams.size() != 0) {
228                FATAL("Stream %s already found", STR(destName));
229                return false;
230        }
231
232        //4. Prepare the stream parameters
233        Variant &parameters = pFrom->GetCustomParameters();
234
235        parameters["customParameters"]["externalStreamConfig"]["emulateUserAgent"] = HTTP_HEADERS_SERVER_US;
236        parameters["customParameters"]["externalStreamConfig"]["forceTcp"] = (bool)false;
237        parameters["customParameters"]["externalStreamConfig"]["isHds"] = (bool)false;
238        parameters["customParameters"]["externalStreamConfig"]["isHls"] = (bool)false;
239        parameters["customParameters"]["externalStreamConfig"]["keepAlive"] = (bool)true;
240        parameters["customParameters"]["externalStreamConfig"]["localStreamName"] = destName;
241        parameters["customParameters"]["externalStreamConfig"]["pageUrl"] = "";
242        parameters["customParameters"]["externalStreamConfig"]["rtcpDetectionInterval"] = (uint32_t) 10;
243        parameters["customParameters"]["externalStreamConfig"]["swfUrl"] = "";
244        parameters["customParameters"]["externalStreamConfig"]["tcUrl"] = "";
245        parameters["customParameters"]["externalStreamConfig"]["tos"] = (uint8_t) 256;
246        parameters["customParameters"]["externalStreamConfig"]["ttl"] = (uint8_t) 256;
247        parameters["customParameters"]["externalStreamConfig"]["uri"] = uri;
248
249        //5. Create the createStream request
250        Variant createStreamRequest = StreamMessageFactory::GetInvokeCreateStream();
251
252        //6. Send it
253        if (!SendRTMPMessage(pFrom, createStreamRequest, true)) {
254                FATAL("Unable to send request:\n%s", STR(createStreamRequest.ToString()));
255                return false;
256        }
257
258        //7. Done
259        return true;
260}
261
262bool BaseRTMPAppProtocolHandler::PushLocalStream(Variant streamConfig) {
263        //1. get the stream name
264        string streamName = (string) streamConfig["localStreamName"];
265
266        //2. Get the streams manager
267        StreamsManager *pStreamsManager = GetApplication()->GetStreamsManager();
268
269        //3. Search for all streams named streamName having the type of IN_NET
270        map<uint32_t, BaseStream *> streams = pStreamsManager->FindByTypeByName(
271                        ST_IN_NET, streamName, true, true);
272        if (streams.size() == 0) {
273                FATAL("Stream %s not found", STR(streamName));
274                return false;
275        }
276
277        //4. See if inside the returned collection of streams
278        //we have something compatible with RTMP
279        BaseInStream *pInStream = NULL;
280
281        FOR_MAP(streams, uint32_t, BaseStream *, i) {
282                if ((MAP_VAL(i)->IsCompatibleWithType(ST_OUT_NET_RTMP_4_RTMP))
283                                || (MAP_VAL(i)->IsCompatibleWithType(ST_OUT_NET_RTMP_4_TS))) {
284                        pInStream = (BaseInStream *) MAP_VAL(i);
285                        break;
286                }
287        }
288        if (pInStream == NULL) {
289                WARN("Stream %s not found or is incompatible with RTMP output",
290                                STR(streamName));
291                return false;
292        }
293
294        //5. Prepare the custom parameters
295        Variant parameters;
296        parameters["customParameters"]["localStreamConfig"] = streamConfig;
297        parameters["customParameters"]["localStreamConfig"]["localUniqueStreamId"] = pInStream->GetUniqueId();
298        parameters[CONF_APPLICATION_NAME] = GetApplication()->GetName();
299        if (streamConfig["targetUri"]["scheme"] == "rtmp") {
300                parameters[CONF_PROTOCOL] = CONF_PROTOCOL_OUTBOUND_RTMP;
301        } else if (streamConfig["targetUri"]["scheme"] == "rtmpt") {
302                parameters[CONF_PROTOCOL] = CONF_PROTOCOL_OUTBOUND_RTMPT;
303        } else if (streamConfig["targetUri"]["scheme"] == "rtmpe") {
304                parameters[CONF_PROTOCOL] = CONF_PROTOCOL_OUTBOUND_RTMPE;
305        } else {
306                FATAL("scheme %s not supported by RTMP handler",
307                                STR(streamConfig["targetUri"]["scheme"]));
308                return false;
309        }
310
311        //6. start the connecting sequence
312        return OutboundRTMPProtocol::Connect(
313                        streamConfig["targetUri"]["ip"],
314                        (uint16_t) streamConfig["targetUri"]["port"],
315                        parameters);
316}
317
318bool BaseRTMPAppProtocolHandler::PushLocalStream(BaseRTMPProtocol *pFrom,
319                string sourceName, string destName) {
320        //1. Get the streams manager
321        StreamsManager *pStreamsManager = GetApplication()->GetStreamsManager();
322
323        //2. Search for all streams named streamName having the type of IN_NET
324        map<uint32_t, BaseStream *> streams = pStreamsManager->FindByTypeByName(
325                        ST_IN_NET, sourceName, true, true);
326        if (streams.size() == 0) {
327                FATAL("Stream %s not found", STR(sourceName));
328                return false;
329        }
330
331        //3. See if inside the returned collection of streams
332        //we have something compatible with RTMP
333        BaseInStream *pInStream = NULL;
334
335        FOR_MAP(streams, uint32_t, BaseStream *, i) {
336                if ((MAP_VAL(i)->IsCompatibleWithType(ST_OUT_NET_RTMP_4_RTMP))
337                                || (MAP_VAL(i)->IsCompatibleWithType(ST_OUT_NET_RTMP_4_TS))) {
338                        pInStream = (BaseInStream *) MAP_VAL(i);
339                        break;
340                }
341        }
342        if (pInStream == NULL) {
343                WARN("Stream %s not found or is incompatible with RTMP output",
344                                STR(sourceName));
345                return false;
346        }
347
348        //4. Prepare the stream parameters
349        Variant &parameters = pFrom->GetCustomParameters();
350        parameters["customParameters"]["localStreamConfig"]["emulateUserAgent"] = HTTP_HEADERS_SERVER_US;
351        parameters["customParameters"]["localStreamConfig"]["forceTcp"] = (bool)false;
352        parameters["customParameters"]["localStreamConfig"]["isHds"] = (bool)false;
353        parameters["customParameters"]["localStreamConfig"]["isHls"] = (bool)false;
354        parameters["customParameters"]["localStreamConfig"]["keepAlive"] = (bool)true;
355        parameters["customParameters"]["localStreamConfig"]["localStreamName"] = sourceName;
356        parameters["customParameters"]["localStreamConfig"]["pageUrl"] = "";
357        parameters["customParameters"]["localStreamConfig"]["swfUrl"] = "";
358        parameters["customParameters"]["localStreamConfig"]["targetStreamName"] = destName;
359        parameters["customParameters"]["localStreamConfig"]["targetStreamType"] = "live";
360        parameters["customParameters"]["localStreamConfig"]["targetUri"].IsArray(false);
361        parameters["customParameters"]["localStreamConfig"]["tcUrl"] = "";
362        parameters["customParameters"]["localStreamConfig"]["tos"] = (uint8_t) 256;
363        parameters["customParameters"]["localStreamConfig"]["ttl"] = (uint8_t) 256;
364        parameters["customParameters"]["localStreamConfig"]["localUniqueStreamId"] = pInStream->GetUniqueId();
365
366        //5. Create the createStream request
367        Variant createStreamRequest = StreamMessageFactory::GetInvokeCreateStream();
368
369        //6. Send it
370        if (!SendRTMPMessage(pFrom, createStreamRequest, true)) {
371                FATAL("Unable to send request:\n%s", STR(createStreamRequest.ToString()));
372                return false;
373        }
374
375        //7. Done
376        return true;
377}
378
379bool BaseRTMPAppProtocolHandler::OutboundConnectionEstablished(
380                OutboundRTMPProtocol *pFrom) {
381        if (NeedsToPullExternalStream(pFrom)) {
382                return PullExternalStream(pFrom);
383        }
384
385        if (NeedsToPushLocalStream(pFrom)) {
386                return PushLocalStream(pFrom);
387        }
388
389        WARN("You should override BaseRTMPAppProtocolHandler::OutboundConnectionEstablished");
390        return false;
391}
392
393bool BaseRTMPAppProtocolHandler::AuthenticateInbound(BaseRTMPProtocol *pFrom,
394                Variant &request, Variant &authState) {
395        if (_authMethod == CONF_APPLICATION_AUTH_TYPE_ADOBE) {
396                return AuthenticateInboundAdobe(pFrom, request, authState);
397        } else {
398                FATAL("Auth scheme not supported: %s", STR(_authMethod));
399                return false;
400        }
401}
402
403bool BaseRTMPAppProtocolHandler::InboundMessageAvailable(BaseRTMPProtocol *pFrom,
404                Header &header, IOBuffer &inputBuffer) {
405        Variant request;
406        if (!_rtmpProtocolSerializer.Deserialize(header, inputBuffer, request)) {
407                FATAL("Unable to deserialize message");
408                return false;
409        }
410
411        return InboundMessageAvailable(pFrom, request);
412}
413
414bool BaseRTMPAppProtocolHandler::InboundMessageAvailable(BaseRTMPProtocol *pFrom,
415                Variant &request) {
416
417        //1. Perform authentication
418        Variant &parameters = pFrom->GetCustomParameters();
419        if (!parameters.HasKey("authState"))
420                parameters["authState"].IsArray(false);
421        Variant &authState = parameters["authState"];
422
423        switch (pFrom->GetType()) {
424                case PT_INBOUND_RTMP:
425                {
426                        if (_authMethod != "") {
427                                if (!AuthenticateInbound(pFrom, request, authState)) {
428                                        FATAL("Unable to authenticate");
429                                        return false;
430                                }
431                        } else {
432                                authState["stage"] = "authenticated";
433                                authState["canPublish"] = (bool)true;
434                                authState["canOverrideStreamName"] = (bool)false;
435                        }
436                        break;
437                }
438                case PT_OUTBOUND_RTMP:
439                {
440                        authState["stage"] = "authenticated";
441                        authState["canPublish"] = (bool)true;
442                        authState["canOverrideStreamName"] = (bool)false;
443                        break;
444                }
445                default:
446                {
447                        WARN("Invalid protocol type");
448                        return false;
449                }
450        }
451
452        if (authState["stage"] == "failed") {
453                WARN("Authentication failed");
454                return false;
455        }
456
457        switch ((uint8_t) VH_MT(request)) {
458                case RM_HEADER_MESSAGETYPE_WINACKSIZE:
459                {
460                        return ProcessWinAckSize(pFrom, request);
461                }
462                case RM_HEADER_MESSAGETYPE_PEERBW:
463                {
464                        return ProcessPeerBW(pFrom, request);
465                }
466                case RM_HEADER_MESSAGETYPE_ACK:
467                {
468                        return ProcessAck(pFrom, request);
469                }
470                case RM_HEADER_MESSAGETYPE_CHUNKSIZE:
471                {
472                        return ProcessChunkSize(pFrom, request);
473                }
474                case RM_HEADER_MESSAGETYPE_USRCTRL:
475                {
476                        return ProcessUsrCtrl(pFrom, request);
477                }
478                case RM_HEADER_MESSAGETYPE_NOTIFY:
479                {
480                        return ProcessNotify(pFrom, request);
481                }
482                case RM_HEADER_MESSAGETYPE_FLEXSTREAMSEND:
483                {
484                        return ProcessFlexStreamSend(pFrom, request);
485                }
486                case RM_HEADER_MESSAGETYPE_INVOKE:
487                {
488                        return ProcessInvoke(pFrom, request);
489                }
490                case RM_HEADER_MESSAGETYPE_SHAREDOBJECT:
491                case RM_HEADER_MESSAGETYPE_FLEXSHAREDOBJECT:
492                {
493                        return ProcessSharedObject(pFrom, request);
494                }
495                case RM_HEADER_MESSAGETYPE_FLEX:
496                {
497                        return ProcessInvoke(pFrom, request);
498                }
499                case RM_HEADER_MESSAGETYPE_ABORTMESSAGE:
500                {
501                        return ProcessAbortMessage(pFrom, request);
502                }
503                default:
504                {
505                        FATAL("Request type not yet implemented:\n%s",
506                                        STR(request.ToString()));
507                        return false;
508                }
509        }
510}
511
512void BaseRTMPAppProtocolHandler::GenerateMetaFiles() {
513        vector<string> files;
514        if (!listFolder(_configuration[CONF_APPLICATION_MEDIAFOLDER],
515                        files)) {
516                FATAL("Unable to list folder %s",
517                                STR(_configuration[CONF_APPLICATION_MEDIAFOLDER]));
518                return;
519        }
520
521        string file;
522        string name;
523        string extension;
524        string lowercaseExtension;
525
526        FOR_VECTOR_ITERATOR(string, files, i) {
527                file = VECTOR_VAL(i);
528
529                splitFileName(file, name, extension);
530                lowercaseExtension = lowerCase(extension);
531
532                if (lowercaseExtension != MEDIA_TYPE_FLV
533                                && lowercaseExtension != MEDIA_TYPE_MP3
534                                && lowercaseExtension != MEDIA_TYPE_MP4
535                                && lowercaseExtension != MEDIA_TYPE_M4A
536                                && lowercaseExtension != MEDIA_TYPE_M4V
537                                && lowercaseExtension != MEDIA_TYPE_MOV
538                                && lowercaseExtension != MEDIA_TYPE_F4V)
539                        continue;
540                string flashName = "";
541                if (lowercaseExtension == MEDIA_TYPE_FLV) {
542                        flashName = name;
543                } else if (lowercaseExtension == MEDIA_TYPE_MP3) {
544                        flashName = lowercaseExtension + ":" + name;
545                } else {
546                        if (lowercaseExtension == MEDIA_TYPE_MP4
547                                        || lowercaseExtension == MEDIA_TYPE_M4A
548                                        || lowercaseExtension == MEDIA_TYPE_M4V
549                                        || lowercaseExtension == MEDIA_TYPE_MOV
550                                        || lowercaseExtension == MEDIA_TYPE_F4V) {
551                                flashName = MEDIA_TYPE_MP4":" + name + "." + extension;
552                        } else {
553                                flashName = lowercaseExtension + ":" + name + "." + extension;
554                        }
555                }
556
557                GetMetaData(flashName, true);
558        }
559}
560
561bool BaseRTMPAppProtocolHandler::ProcessAbortMessage(BaseRTMPProtocol *pFrom,
562                Variant &request) {
563        if (request[RM_ABORTMESSAGE] != _V_NUMERIC) {
564                FATAL("Invalid message: %s", STR(request.ToString()));
565                return false;
566        }
567        return pFrom->ResetChannel((uint32_t) request[RM_ABORTMESSAGE]);
568}
569
570bool BaseRTMPAppProtocolHandler::ProcessWinAckSize(BaseRTMPProtocol *pFrom,
571                Variant &request) {
572        if (request[RM_WINACKSIZE] != _V_NUMERIC) {
573                FATAL("Invalid message: %s", STR(request.ToString()));
574                return false;
575        }
576        uint32_t size = (uint32_t) request[RM_WINACKSIZE];
577        if ((size > 16 * 1024 * 1024) || size == 0) {
578                FATAL("Invalid message: %s", STR(request.ToString()));
579                return false;
580        }
581        pFrom->SetWinAckSize(request[RM_WINACKSIZE]);
582        return true;
583}
584
585bool BaseRTMPAppProtocolHandler::ProcessPeerBW(BaseRTMPProtocol *pFrom,
586                Variant &request) {
587        //WARN("ProcessPeerBW");
588        return true;
589}
590
591bool BaseRTMPAppProtocolHandler::ProcessAck(BaseRTMPProtocol *pFrom,
592                Variant &request) {
593        return true;
594}
595
596bool BaseRTMPAppProtocolHandler::ProcessChunkSize(BaseRTMPProtocol *pFrom,
597                Variant &request) {
598        if (request[RM_CHUNKSIZE] != _V_NUMERIC) {
599                FATAL("Invalid message: %s", STR(request.ToString()));
600                return false;
601        }
602        uint32_t size = (uint32_t) request[RM_CHUNKSIZE];
603        if ((size > 4 * 1024 * 1024) || size == 0) {
604                FATAL("Invalid message: %s", STR(request.ToString()));
605                return false;
606        }
607        if (!pFrom->SetInboundChunkSize(size)) {
608                FATAL("Unable to set chunk size:\n%s", STR(request.ToString()));
609                return false;
610        }
611
612        return true;
613}
614
615bool BaseRTMPAppProtocolHandler::ProcessUsrCtrl(BaseRTMPProtocol *pFrom,
616                Variant &request) {
617        switch ((uint16_t) M_USRCTRL_TYPE(request)) {
618                case RM_USRCTRL_TYPE_PING_REQUEST:
619                {
620                        Variant response = ConnectionMessageFactory::GetPong((uint32_t) M_USRCTRL_PING(request));
621                        return SendRTMPMessage(pFrom, response);
622                }
623                case RM_USRCTRL_TYPE_STREAM_BEGIN:
624                case RM_USRCTRL_TYPE_STREAM_SET_BUFFER_LENGTH:
625                case RM_USRCTRL_TYPE_STREAM_IS_RECORDED:
626                case RM_USRCTRL_TYPE_PING_RESPONSE:
627                {
628                        WARN("User control message type: %s", STR(M_USRCTRL_TYPE_STRING(request)));
629                        return true;
630                }
631                case RM_USRCTRL_TYPE_UNKNOWN1:
632                case RM_USRCTRL_TYPE_UNKNOWN2:
633                {
634                        return true;
635                }
636                default:
637                {
638                        FATAL("Invalid user ctrl:\n%s", STR(request.ToString()));
639                        return false;
640                }
641        }
642}
643
644bool BaseRTMPAppProtocolHandler::ProcessNotify(BaseRTMPProtocol *pFrom,
645                Variant &request) {
646        //1. Find the corresponding inbound stream
647        InNetRTMPStream *pInNetRTMPStream = NULL;
648        map<uint32_t, BaseStream *> possibleStreams = GetApplication()->
649                        GetStreamsManager()->FindByProtocolIdByType(pFrom->GetId(), ST_IN_NET_RTMP, false);
650
651        FOR_MAP(possibleStreams, uint32_t, BaseStream *, i) {
652                if (((InNetRTMPStream *) MAP_VAL(i))->GetRTMPStreamId() == (uint32_t) VH_SI(request)) {
653                        pInNetRTMPStream = (InNetRTMPStream *) MAP_VAL(i);
654                        break;
655                }
656        }
657        if (pInNetRTMPStream == NULL) {
658                WARN("No stream found. Searched for %u:%u. Message was:\n%s",
659                                pFrom->GetId(),
660                                (uint32_t) VH_SI(request),
661                                STR(request.ToString()));
662                return true;
663        }
664
665        //2. Remove all string values starting with @
666        //TODO: Wtf are those!?
667        vector<string> removedKeys;
668
669        FOR_MAP(M_NOTIFY_PARAMS(request), string, Variant, i) {
670                if ((VariantType) MAP_VAL(i) == V_STRING) {
671                        if (((string) MAP_VAL(i)).find("@") == 0)
672                                ADD_VECTOR_END(removedKeys, MAP_KEY(i));
673                }
674        }
675
676        FOR_VECTOR(removedKeys, i) {
677
678                M_NOTIFY_PARAMS(request).RemoveKey(removedKeys[i]);
679        }
680
681        //3. Brodcast the message on the inbound stream
682        return pInNetRTMPStream->SendStreamMessage(request);
683}
684
685bool BaseRTMPAppProtocolHandler::ProcessFlexStreamSend(BaseRTMPProtocol *pFrom,
686                Variant &request) {
687
688        //1. Find the corresponding inbound stream
689        InNetRTMPStream *pInNetRTMPStream = NULL;
690        map<uint32_t, BaseStream *> possibleStreams = GetApplication()->
691                        GetStreamsManager()->FindByProtocolIdByType(pFrom->GetId(), ST_IN_NET_RTMP, false);
692
693        FOR_MAP(possibleStreams, uint32_t, BaseStream *, i) {
694                if (((InNetRTMPStream *) MAP_VAL(i))->GetRTMPStreamId() == (uint32_t) VH_SI(request)) {
695                        pInNetRTMPStream = (InNetRTMPStream *) MAP_VAL(i);
696                        break;
697                }
698        }
699        if (pInNetRTMPStream == NULL) {
700                WARN("No stream found. Searched for %u:%u",
701                                pFrom->GetId(),
702                                (uint32_t) VH_SI(request));
703                return true;
704        }
705
706        //3. Remove all string values starting with @
707        //TODO: Wtf are those!?
708        vector<string> removedKeys;
709
710        FOR_MAP(M_FLEXSTREAMSEND_PARAMS(request), string, Variant, i) {
711                if ((VariantType) MAP_VAL(i) == V_STRING) {
712
713                        if (((string) MAP_VAL(i)).find("@") == 0)
714                                ADD_VECTOR_END(removedKeys, MAP_KEY(i));
715                }
716        }
717
718        FOR_VECTOR(removedKeys, i) {
719
720                M_FLEXSTREAMSEND_PARAMS(request).RemoveKey(removedKeys[i]);
721        }
722
723        //4. Brodcast the message on the inbound stream
724        return pInNetRTMPStream->SendStreamMessage(request);
725}
726
727bool BaseRTMPAppProtocolHandler::ProcessSharedObject(BaseRTMPProtocol *pFrom,
728                Variant &request) {
729        return _soManager.Process(pFrom, request);
730}
731
732bool BaseRTMPAppProtocolHandler::ProcessInvoke(BaseRTMPProtocol *pFrom,
733                Variant &request) {
734        //PROD_ACCESS(CreateLogEventInvoke(pFrom, request));
735        string functionName = request[RM_INVOKE][RM_INVOKE_FUNCTION];
736        uint32_t currentInvokeId = M_INVOKE_ID(request);
737        if (currentInvokeId != 0) {
738                if (_nextInvokeId[pFrom->GetId()] <= currentInvokeId) {
739                        _nextInvokeId[pFrom->GetId()] = currentInvokeId + 1;
740                }
741        }
742        if (functionName == RM_INVOKE_FUNCTION_CONNECT) {
743                return ProcessInvokeConnect(pFrom, request);
744        } else if (functionName == RM_INVOKE_FUNCTION_CREATESTREAM) {
745                return ProcessInvokeCreateStream(pFrom, request);
746        } else if (functionName == RM_INVOKE_FUNCTION_PUBLISH) {
747                return ProcessInvokePublish(pFrom, request);
748        } else if (functionName == RM_INVOKE_FUNCTION_PLAY) {
749                return ProcessInvokePlay(pFrom, request);
750        } else if (functionName == RM_INVOKE_FUNCTION_PAUSERAW) {
751                return ProcessInvokePauseRaw(pFrom, request);
752        } else if (functionName == RM_INVOKE_FUNCTION_PAUSE) {
753                return ProcessInvokePause(pFrom, request);
754        } else if (functionName == RM_INVOKE_FUNCTION_SEEK) {
755                return ProcessInvokeSeek(pFrom, request);
756        } else if (functionName == RM_INVOKE_FUNCTION_CLOSESTREAM) {
757                return ProcessInvokeCloseStream(pFrom, request);
758        } else if (functionName == RM_INVOKE_FUNCTION_RELEASESTREAM) {
759                return ProcessInvokeReleaseStream(pFrom, request);
760        } else if (functionName == RM_INVOKE_FUNCTION_DELETESTREAM) {
761                return ProcessInvokeDeleteStream(pFrom, request);
762        } else if (functionName == RM_INVOKE_FUNCTION_RESULT) {
763                return ProcessInvokeResult(pFrom, request);
764        } else if (functionName == RM_INVOKE_FUNCTION_ERROR) {
765                return ProcessInvokeResult(pFrom, request);
766        } else if (functionName == RM_INVOKE_FUNCTION_ONSTATUS) {
767                return ProcessInvokeOnStatus(pFrom, request);
768        } else if (functionName == RM_INVOKE_FUNCTION_FCPUBLISH) {
769                return ProcessInvokeFCPublish(pFrom, request);
770        } else if (functionName == RM_INVOKE_FUNCTION_FCSUBSCRIBE) {
771                return ProcessInvokeFCSubscribe(pFrom, request);
772        } else if (functionName == RM_INVOKE_FUNCTION_GETSTREAMLENGTH) {
773                return ProcessInvokeGetStreamLength(pFrom, request);
774        } else if (functionName == RM_INVOKE_FUNCTION_ONBWDONE) {
775                return ProcessInvokeOnBWDone(pFrom, request);
776        } else if (functionName == RM_INVOKE_FUNCTION_CHECKBANDWIDTH) {
777                return ProcessInvokeCheckBandwidth(pFrom, request);
778        } else {
779                return ProcessInvokeGeneric(pFrom, request);
780        }
781}
782
783bool BaseRTMPAppProtocolHandler::ProcessInvokeConnect(BaseRTMPProtocol *pFrom,
784                Variant & request) {
785        //1. Send the channel specific messages
786        Variant response = GenericMessageFactory::GetWinAckSize(2500000);
787        if (!SendRTMPMessage(pFrom, response)) {
788                FATAL("Unable to send message to client");
789                return false;
790        }
791        response = GenericMessageFactory::GetPeerBW(2500000, RM_PEERBW_TYPE_DYNAMIC);
792        if (!SendRTMPMessage(pFrom, response)) {
793                FATAL("Unable to send message to client");
794                return false;
795        }
796
797        //2. Initialize stream 0
798        response = StreamMessageFactory::GetUserControlStreamBegin(0);
799        if (!SendRTMPMessage(pFrom, response)) {
800                FATAL("Unable to send message to client");
801                return false;
802        }
803
804        //3. Send the connect result
805        response = ConnectionMessageFactory::GetInvokeConnectResult(request);
806        if (!SendRTMPMessage(pFrom, response)) {
807                FATAL("Unable to send message to client");
808                return false;
809        }
810
811        //4. Send onBWDone
812        response = GenericMessageFactory::GetInvokeOnBWDone(1024 * 8);
813        if (!SendRTMPMessage(pFrom, response)) {
814                FATAL("Unable to send message to client");
815                return false;
816        }
817
818        //5. Done
819        return true;
820}
821
822bool BaseRTMPAppProtocolHandler::ProcessInvokeCreateStream(BaseRTMPProtocol *pFrom,
823                Variant &request) {
824        uint32_t id = 0;
825
826        //1. Create the neutral stream
827        if (pFrom->CreateNeutralStream(id) == NULL) {
828                FATAL("Unable to create stream");
829                return false;
830        }
831
832        //2. Send the response
833        Variant response = StreamMessageFactory::GetInvokeCreateStreamResult(request, id);
834        return SendRTMPMessage(pFrom, response);
835}
836
837bool BaseRTMPAppProtocolHandler::ProcessInvokePublish(BaseRTMPProtocol *pFrom,
838                Variant &request) {
839        //1. gather the required data from the request
840        if ((M_INVOKE_PARAM(request, 1) != V_STRING) && (M_INVOKE_PARAM(request, 1) != V_BOOL)) {
841                FATAL("Invalid request:\n%s", STR(request.ToString()));
842                return false;
843        }
844
845        if (M_INVOKE_PARAM(request, 1) == V_BOOL) {
846                if ((bool)M_INVOKE_PARAM(request, 1) != false) {
847                        FATAL("Invalid request:\n%s", STR(request.ToString()));
848                        return false;
849                }
850                FINEST("Closing stream via publish(false)");
851                return pFrom->CloseStream(VH_SI(request), true);
852        }
853
854        string streamName = M_INVOKE_PARAM(request, 1);
855        string::size_type pos = streamName.find("?");
856        if (pos != string::npos) {
857                streamName = streamName.substr(0, pos);
858        }
859        trim(streamName);
860        if (streamName == "") {
861                Variant response = StreamMessageFactory::GetInvokeOnStatusStreamPublishBadName(request, streamName);
862                return pFrom->SendMessage(response);
863        }
864        M_INVOKE_PARAM(request, 1) = streamName;
865
866        //2. Check to see if we are allowed to create inbound streams
867        if (!(bool)pFrom->GetCustomParameters()["authState"]["canPublish"]) {
868                Variant response = StreamMessageFactory::GetInvokeOnStatusStreamPublishBadName(request, streamName);
869                return pFrom->SendMessage(response);
870        }
871
872
873        bool recording = (M_INVOKE_PARAM(request, 2) == RM_INVOKE_PARAMS_PUBLISH_TYPERECORD);
874        bool appending = (M_INVOKE_PARAM(request, 2) == RM_INVOKE_PARAMS_PUBLISH_TYPEAPPEND);
875        //      FINEST("Try to publish stream %s.%s",
876        //                      STR(streamName), (recording || appending) ? " Also record/append it" : "");
877
878        //3. Test to see if this stream name is already published somewhere
879        if (GetApplication()->GetAllowDuplicateInboundNetworkStreams()) {
880                map<uint32_t, BaseStream *> existingStreams =
881                                GetApplication()->GetStreamsManager()->FindByTypeByName(
882                                ST_IN_NET_RTMP, streamName, false, false);
883                if (existingStreams.size() > 0) {
884                        if (!(bool)pFrom->GetCustomParameters()["authState"]["canOverrideStreamName"]) {
885                                WARN("Unable to override stream %s because this connection doesn't have the rights",
886                                                STR(streamName));
887                                Variant response = StreamMessageFactory::GetInvokeOnStatusStreamPublishBadName(
888                                                request, streamName);
889                                return pFrom->SendMessage(response);
890                        } else {
891
892                                FOR_MAP(existingStreams, uint32_t, BaseStream *, i) {
893                                        InNetRTMPStream *pTempStream = (InNetRTMPStream *) MAP_VAL(i);
894                                        if (pTempStream->GetProtocol() != NULL) {
895                                                WARN("Overriding stream R%u:U%u with name %s from connection %u",
896                                                                pTempStream->GetRTMPStreamId(),
897                                                                pTempStream->GetUniqueId(),
898                                                                STR(pTempStream->GetName()),
899                                                                pTempStream->GetProtocol()->GetId());
900                                                ((BaseRTMPProtocol *) pTempStream->GetProtocol())->CloseStream(
901                                                                pTempStream->GetRTMPStreamId(), true);
902                                        }
903                                }
904                        }
905                }
906        } else {
907                if (!GetApplication()->StreamNameAvailable(streamName, pFrom)) {
908                        WARN("Stream name %s already occupied and application doesn't allow duplicated inbound network streams",
909                                        STR(streamName));
910                        Variant response = StreamMessageFactory::GetInvokeOnStatusStreamPublishBadName(
911                                        request, streamName);
912                        return pFrom->SendMessage(response);
913                }
914        }
915
916        //4. Create the network inbound stream
917        InNetRTMPStream *pInNetRTMPStream = pFrom->CreateINS(VH_CI(request),
918                        VH_SI(request), streamName);
919        if (pInNetRTMPStream == NULL) {
920                FATAL("Unable to create inbound stream");
921                return false;
922        }
923
924        //6. Get the list of waiting subscribers
925        map<uint32_t, BaseOutStream *> subscribedOutStreams =
926                        GetApplication()->GetStreamsManager()->GetWaitingSubscribers(
927                        streamName, pInNetRTMPStream->GetType(), true);
928        //FINEST("subscribedOutStreams count: %"PRIz"u", subscribedOutStreams.size());
929
930
931        //7. Bind the waiting subscribers
932
933        FOR_MAP(subscribedOutStreams, uint32_t, BaseOutStream *, i) {
934                BaseOutStream *pBaseOutStream = MAP_VAL(i);
935                pBaseOutStream->Link(pInNetRTMPStream);
936        }
937
938        //8. Send the streamPublished status message
939        if (!pInNetRTMPStream->SendOnStatusStreamPublished()) {
940                FATAL("Unable to send OnStatusStreamPublished");
941                return false;
942        }
943
944        //9. Start recording if necessary
945        if (recording || appending) {
946                Variant meta = GetMetaData(streamName, false);
947
948                BaseOutFileStream *pOutFileStream = CreateOutFileStream(pFrom, meta, appending);
949                if (!pOutFileStream || !pInNetRTMPStream->Record(pOutFileStream)) {
950                        FATAL("Unable to bind the recording stream");
951                        return false;
952                }
953        }
954
955        //10. Done
956        return true;
957}
958
959bool BaseRTMPAppProtocolHandler::ProcessInvokeSeek(BaseRTMPProtocol *pFrom,
960                Variant & request) {
961        //1. Read stream index and offset in millisecond
962        uint32_t streamId = VH_SI(request);
963        double timeOffset = 0.0;
964        if (M_INVOKE_PARAM(request, 1) == _V_NUMERIC)
965                timeOffset = M_INVOKE_PARAM(request, 1);
966
967        //2. Find the corresponding outbound stream
968        BaseOutNetRTMPStream *pOutNetRTMPStream = NULL;
969        map<uint32_t, BaseStream *> possibleStreams = GetApplication()->
970                        GetStreamsManager()->FindByProtocolIdByType(pFrom->GetId(), ST_OUT_NET_RTMP, true);
971
972        FOR_MAP(possibleStreams, uint32_t, BaseStream *, i) {
973                if (((BaseOutNetRTMPStream *) MAP_VAL(i))->GetRTMPStreamId() == streamId) {
974                        pOutNetRTMPStream = (BaseOutNetRTMPStream *) MAP_VAL(i);
975                        break;
976                }
977        }
978        if (pOutNetRTMPStream == NULL) {
979                FATAL("No out stream");
980                return false;
981        }
982
983        return pOutNetRTMPStream->Seek(timeOffset);
984}
985
986bool BaseRTMPAppProtocolHandler::ProcessInvokePlay(BaseRTMPProtocol *pFrom,
987                Variant & request) {
988        //1. Minimal validation
989        if (M_INVOKE_PARAM(request, 1) != V_STRING) {
990                FATAL("Invalid request:\n%s", STR(request.ToString()));
991                return false;
992        }
993
994        //2. Close any streams left open
995        if (!pFrom->CloseStream(VH_SI(request), true)) {
996                FATAL("Unable to close stream %u:%u",
997                                pFrom->GetId(),
998                                (uint32_t) VH_SI(request));
999                return false;
1000        }
1001
1002        //3. Gather required data from the request
1003        string streamName = M_INVOKE_PARAM(request, 1);
1004        double startTime = -2;
1005        double length = -1;
1006        if (M_INVOKE_PARAM(request, 2) == V_DOUBLE)
1007                startTime = M_INVOKE_PARAM(request, 2);
1008        if (M_INVOKE_PARAM(request, 3) == V_DOUBLE)
1009                length = M_INVOKE_PARAM(request, 3);
1010
1011        if (startTime < 0 && startTime != -2 && startTime != -1)
1012                startTime = -2;
1013
1014        if (length < 0 && length != -1)
1015                length = -1;
1016
1017        //4. Get the metadata for the stream
1018        Variant metadata = GetMetaData(streamName, true);
1019        if (metadata != V_MAP) {
1020                FATAL("Unable to get metadata");
1021                return false;
1022        }
1023
1024        INFO("Play request for stream name `%s`. Start: %.0f; length: %.0f",
1025                        STR(streamName), startTime, length);
1026
1027        //6. bind the network outbound stream to the inbound stream
1028        //depending on the type of the outbound stream
1029        switch ((int32_t) startTime) {
1030                case -2: //live or recorded
1031                {
1032                        bool linked = false;
1033
1034                        //7. try to link to live stream
1035                        if (!TryLinkToLiveStream(pFrom, VH_SI(request), streamName, linked)) {
1036                                FATAL("Unable to link streams");
1037                                return false;
1038                        }
1039                        if (linked)
1040                                return true;
1041
1042                        //8. try to link to file stream
1043                        if (!TryLinkToFileStream(pFrom, VH_SI(request), metadata, streamName,
1044                                        startTime, length, linked)) {
1045                                FATAL("Unable to link streams");
1046                                return false;
1047                        }
1048                        if (linked)
1049                                return true;
1050
1051                        //9. Ok, no live/file stream. Just wait for the live stream now...
1052                        WARN("We are going to wait for the live stream `%s`", STR(streamName));
1053                        BaseOutNetRTMPStream * pBaseOutNetRTMPStream = pFrom->CreateONS(
1054                                        VH_SI(request),
1055                                        streamName,
1056                                        ST_IN_NET_RTMP);
1057                        request["waitForLiveStream"] = (bool)true;
1058                        request["streamName"] = streamName;
1059                        return pBaseOutNetRTMPStream != NULL;
1060                }
1061                case -1: //only live
1062                {
1063                        bool linked = false;
1064
1065                        //10. try to link to live stream
1066                        if (!TryLinkToLiveStream(pFrom, VH_SI(request), streamName, linked)) {
1067                                FATAL("Unable to link streams");
1068                                return false;
1069                        }
1070                        if (linked)
1071                                return true;
1072
1073                        //11. Ok, no live/file stream. Just wait for the live stream now...
1074                        WARN("We are going to wait for the live stream `%s`", STR(streamName));
1075                        BaseOutNetRTMPStream * pBaseOutNetRTMPStream = pFrom->CreateONS(
1076                                        VH_SI(request),
1077                                        streamName,
1078                                        ST_IN_NET_RTMP);
1079                        request["waitForLiveStream"] = (bool)true;
1080                        request["streamName"] = streamName;
1081                        return pBaseOutNetRTMPStream != NULL;
1082                }
1083                default: //only recorded
1084                {
1085                        //12. Perform little adjustment on metadata
1086                        if (metadata[META_MEDIA_TYPE] == MEDIA_TYPE_LIVE_OR_FLV) {
1087                                metadata[META_MEDIA_TYPE] = MEDIA_TYPE_FLV;
1088                        }
1089
1090                        //13. try to link to file stream
1091                        bool linked = false;
1092                        if (!TryLinkToFileStream(pFrom, VH_SI(request), metadata, streamName,
1093                                        startTime, length, linked)) {
1094                                FATAL("Unable to link streams");
1095                                return false;
1096                        }
1097
1098                        return linked;
1099                }
1100        }
1101}
1102
1103bool BaseRTMPAppProtocolHandler::ProcessInvokePauseRaw(BaseRTMPProtocol *pFrom,
1104                Variant & request) {
1105        //1. Read stream index and offset in millisecond
1106        uint32_t streamId = VH_SI(request);
1107        /*double timeOffset = 0.0;
1108        if ((VariantType) M_INVOKE_PARAM(request, 1) == V_DOUBLE)
1109                timeOffset = M_INVOKE_PARAM(request, 1);*/
1110
1111        //2. Find the corresponding outbound stream
1112        BaseOutNetRTMPStream *pBaseOutNetRTMPStream = NULL;
1113        map<uint32_t, BaseStream *> possibleStreams = GetApplication()->
1114                        GetStreamsManager()->FindByProtocolIdByType(pFrom->GetId(), ST_OUT_NET_RTMP, true);
1115
1116        FOR_MAP(possibleStreams, uint32_t, BaseStream *, i) {
1117                if (((BaseOutNetRTMPStream *) MAP_VAL(i))->GetRTMPStreamId() == streamId) {
1118                        pBaseOutNetRTMPStream = (BaseOutNetRTMPStream *) MAP_VAL(i);
1119                        break;
1120                }
1121        }
1122        if (pBaseOutNetRTMPStream == NULL) {
1123                FATAL("No out stream");
1124                return false;
1125        }
1126
1127        //3. get the operation
1128        bool pause = M_INVOKE_PARAM(request, 1);
1129        if (pause) {
1130                //4. Pause it
1131                return pBaseOutNetRTMPStream->Pause();
1132        } else {
1133                double timeOffset = 0.0;
1134                if (M_INVOKE_PARAM(request, 2) == _V_NUMERIC)
1135                        timeOffset = M_INVOKE_PARAM(request, 2);
1136
1137                //8. Perform seek
1138                if (!pBaseOutNetRTMPStream->Seek(timeOffset)) {
1139                        FATAL("Unable to seek");
1140                        return false;
1141                }
1142
1143                //9. Resume
1144                return pBaseOutNetRTMPStream->Resume();
1145        }
1146}
1147
1148bool BaseRTMPAppProtocolHandler::ProcessInvokePause(BaseRTMPProtocol *pFrom,
1149                Variant & request) {
1150        return ProcessInvokePauseRaw(pFrom, request);
1151}
1152
1153bool BaseRTMPAppProtocolHandler::ProcessInvokeCloseStream(BaseRTMPProtocol *pFrom,
1154                Variant & request) {
1155        return pFrom->CloseStream(VH_SI(request), true);
1156}
1157
1158bool BaseRTMPAppProtocolHandler::ProcessInvokeReleaseStream(BaseRTMPProtocol *pFrom,
1159                Variant & request) {
1160        //1. Attempt to find the stream
1161        map<uint32_t, BaseStream *> streams = GetApplication()->GetStreamsManager()->
1162                        FindByProtocolIdByName(pFrom->GetId(), M_INVOKE_PARAM(request, 1), false);
1163        uint32_t streamId = 0;
1164        if (streams.size() > 0) {
1165                //2. Is this the correct kind?
1166                if (TAG_KIND_OF(MAP_VAL(streams.begin())->GetType(), ST_IN_NET_RTMP)) {
1167                        //3. get the rtmp stream id
1168                        InNetRTMPStream *pInNetRTMPStream = (InNetRTMPStream *) MAP_VAL(streams.begin());
1169                        streamId = pInNetRTMPStream->GetRTMPStreamId();
1170
1171                        //4. close the stream
1172                        if (!pFrom->CloseStream(streamId, true)) {
1173                                FATAL("Unable to close stream");
1174                                return true;
1175                        }
1176                }
1177        }
1178
1179        if (streamId > 0) {
1180                //5. Send the response
1181                Variant response = StreamMessageFactory::GetInvokeReleaseStreamResult(3,
1182                                streamId, M_INVOKE_ID(request), streamId);
1183                if (!pFrom->SendMessage(response)) {
1184                        FATAL("Unable to send message to client");
1185                        return false;
1186                }
1187        } else {
1188                Variant response =
1189                                StreamMessageFactory::GetInvokeReleaseStreamErrorNotFound(request);
1190                if (!pFrom->SendMessage(response)) {
1191                        FATAL("Unable to send message to client");
1192                        return false;
1193                }
1194        }
1195
1196        //3. Done
1197        return true;
1198}
1199
1200bool BaseRTMPAppProtocolHandler::ProcessInvokeDeleteStream(BaseRTMPProtocol *pFrom,
1201                Variant & request) {
1202        return pFrom->CloseStream((uint32_t) M_INVOKE_PARAM(request, 1), false);
1203}
1204
1205bool BaseRTMPAppProtocolHandler::ProcessInvokeOnStatus(BaseRTMPProtocol *pFrom,
1206                Variant & request) {
1207        if ((!NeedsToPullExternalStream(pFrom))
1208                        && (!NeedsToPushLocalStream(pFrom))) {
1209                WARN("Default implementation of ProcessInvokeOnStatus in application %s: Request:\n%s",
1210                                STR(GetApplication()->GetName()),
1211                                STR(request.ToString()));
1212                return true;
1213        }
1214
1215        //1. Test and see if this connection is an outbound RTMP connection
1216        //and get a pointer to it
1217        if (pFrom->GetType() != PT_OUTBOUND_RTMP) {
1218                FATAL("This is not an outbound connection");
1219                return false;
1220        }
1221        OutboundRTMPProtocol *pProtocol = (OutboundRTMPProtocol *) pFrom;
1222
1223        //2. Validate the request
1224        if (M_INVOKE_PARAM(request, 1) != V_MAP) {
1225                FATAL("invalid onStatus:\n%s", STR(request.ToString()));
1226                return false;
1227        }
1228        if (M_INVOKE_PARAM(request, 1)["code"] != V_STRING) {
1229                FATAL("invalid onStatus:\n%s", STR(request.ToString()));
1230                return false;
1231        }
1232
1233        //6. Get our hands on streaming parameters
1234        string path = "";
1235        if (NeedsToPullExternalStream(pFrom))
1236                path = "externalStreamConfig";
1237        else
1238                path = "localStreamConfig";
1239        Variant &parameters = pFrom->GetCustomParameters()["customParameters"][path];
1240
1241        if (NeedsToPullExternalStream(pFrom)) {
1242                if (M_INVOKE_PARAM(request, 1)["code"] != "NetStream.Play.Start") {
1243                        WARN("onStatus message ignored:\n%s", STR(request.ToString()));
1244                        return true;
1245                }
1246                if (!GetApplication()->StreamNameAvailable(parameters["localStreamName"],
1247                                pProtocol)) {
1248                        WARN("Stream name %s already occupied and application doesn't allow duplicated inbound network streams",
1249                                        STR(parameters["localStreamName"]));
1250                        return false;
1251                }
1252                InNetRTMPStream *pStream = pProtocol->CreateINS(VH_CI(request),
1253                                VH_SI(request), parameters["localStreamName"]);
1254                if (pStream == NULL) {
1255                        FATAL("Unable to create stream");
1256                        return false;
1257                }
1258
1259                map<uint32_t, BaseOutStream *> waitingSubscribers =
1260                                GetApplication()->GetStreamsManager()->GetWaitingSubscribers(
1261                                pStream->GetName(),
1262                                pStream->GetType(), true);
1263
1264                FOR_MAP(waitingSubscribers, uint32_t, BaseOutStream *, i) {
1265                        pStream->Link(MAP_VAL(i));
1266                }
1267        } else {
1268                if (M_INVOKE_PARAM(request, 1)["code"] != "NetStream.Publish.Start") {
1269                        WARN("onStatus message ignored:\n%s", STR(request.ToString()));
1270                        return true;
1271                }
1272
1273                BaseInStream *pBaseInStream =
1274                                (BaseInStream *) GetApplication()->GetStreamsManager()->FindByUniqueId(
1275                                (uint32_t) parameters["localUniqueStreamId"]);
1276                if (pBaseInStream == NULL) {
1277                        FATAL("Unable to find the inbound stream with id %u",
1278                                        (uint32_t) parameters["localUniqueStreamId"]);
1279                        return false;
1280                }
1281
1282                //5. Create the network outbound stream
1283                BaseOutNetRTMPStream *pBaseOutNetRTMPStream = pProtocol->CreateONS(
1284                                VH_SI(request),
1285                                pBaseInStream->GetName(),
1286                                pBaseInStream->GetType());
1287                if (pBaseOutNetRTMPStream == NULL) {
1288                        FATAL("Unable to create outbound stream");
1289                        return false;
1290                }
1291                pBaseOutNetRTMPStream->SetSendOnStatusPlayMessages(false);
1292
1293                //6. Link and return
1294                if (!pBaseInStream->Link((BaseOutNetStream*) pBaseOutNetRTMPStream)) {
1295                        FATAL("Unable to link streams");
1296                        return false;
1297                }
1298        }
1299
1300        return true;
1301}
1302
1303bool BaseRTMPAppProtocolHandler::ProcessInvokeFCPublish(BaseRTMPProtocol *pFrom,
1304                Variant & request) {
1305
1306        //1. Get the stream name
1307        string streamName = M_INVOKE_PARAM(request, 1);
1308        string::size_type pos = streamName.find("?");
1309        if (pos != string::npos) {
1310                streamName = streamName.substr(0, pos);
1311        }
1312        trim(streamName);
1313        if (streamName == "") {
1314                Variant response = StreamMessageFactory::GetInvokeOnStatusStreamPublishBadName(request, streamName);
1315                return pFrom->SendMessage(response);
1316        }
1317        M_INVOKE_PARAM(request, 1) = streamName;
1318
1319        //2. Send the release stream response. Is identical to the one
1320        //needed by this fucker
1321        //TODO: this is a nasty hack
1322        Variant response = StreamMessageFactory::GetInvokeReleaseStreamResult(3, 0,
1323                        M_INVOKE_ID(request), 0);
1324        if (!pFrom->SendMessage(response)) {
1325                FATAL("Unable to send message to client");
1326                return false;
1327        }
1328
1329        //3. send the onFCPublish message
1330        response = StreamMessageFactory::GetInvokeOnFCPublish(3, 0, 0, false, 0,
1331                        RM_INVOKE_PARAMS_ONSTATUS_CODE_NETSTREAMPUBLISHSTART, streamName);
1332        if (!SendRTMPMessage(pFrom, response)) {
1333                FATAL("Unable to send message to client");
1334                return false;
1335        }
1336
1337        //4. Done
1338        return true;
1339}
1340
1341bool BaseRTMPAppProtocolHandler::ProcessInvokeFCSubscribe(BaseRTMPProtocol *pFrom,
1342                Variant &request) {
1343
1344        //1. Get the stream name
1345        string streamName = M_INVOKE_PARAM(request, 1);
1346        string::size_type pos = streamName.find("?");
1347        if (pos != string::npos) {
1348                streamName = streamName.substr(0, pos);
1349        }
1350        trim(streamName);
1351        if (streamName == "") {
1352                Variant response = StreamMessageFactory::GetInvokeOnStatusStreamPublishBadName(request, streamName);
1353                return pFrom->SendMessage(response);
1354        }
1355        M_INVOKE_PARAM(request, 1) = streamName;
1356
1357        //2. send the onFCSubscribe message
1358        Variant response = StreamMessageFactory::GetInvokeOnFCSubscribe(3, 0, 0, false, 0,
1359                        RM_INVOKE_PARAMS_ONSTATUS_CODE_NETSTREAMPLAYSTART, streamName);
1360        if (!SendRTMPMessage(pFrom, response)) {
1361                FATAL("Unable to send message to client");
1362                return false;
1363        }
1364
1365        //3. Done
1366        return true;
1367}
1368
1369bool BaseRTMPAppProtocolHandler::ProcessInvokeGetStreamLength(BaseRTMPProtocol *pFrom,
1370                Variant & request) {
1371        Variant metadata = GetMetaData(M_INVOKE_PARAM(request, 1), true);
1372        Variant params;
1373        params[(uint32_t) 0] = Variant();
1374        if ((VariantType) metadata == V_MAP)
1375                params[(uint32_t) 1] = (double) metadata[META_FILE_DURATION] / 1000.00;
1376        else
1377                params[(uint32_t) 1] = 0.0;
1378
1379        Variant response = GenericMessageFactory::GetInvokeResult(request, params);
1380        if (!SendRTMPMessage(pFrom, response)) {
1381                FATAL("Unable to send message to client");
1382                return false;
1383        }
1384        return true;
1385}
1386
1387bool BaseRTMPAppProtocolHandler::ProcessInvokeOnBWDone(BaseRTMPProtocol *pFrom,
1388                Variant &request) {
1389        //WARN("ProcessInvokeOnBWDone");
1390        return true;
1391}
1392
1393bool BaseRTMPAppProtocolHandler::ProcessInvokeCheckBandwidth(BaseRTMPProtocol *pFrom,
1394                Variant &request) {
1395        if (!_enableCheckBandwidth) {
1396                WARN("checkBandwidth is disabled.");
1397                return true;
1398        }
1399        if (!SendRTMPMessage(pFrom, _onBWCheckMessage, true)) {
1400                FATAL("Unable to send message to flash player");
1401                return false;
1402        }
1403        double temp;
1404        GETCLOCKS(temp);
1405        pFrom->GetCustomParameters()["lastOnnBWCheckMessage"] = temp;
1406        return true;
1407}
1408
1409bool BaseRTMPAppProtocolHandler::ProcessInvokeGeneric(BaseRTMPProtocol *pFrom,
1410                Variant & request) {
1411        WARN("Default implementation of ProcessInvokeGeneric: Request: %s",
1412                        STR(M_INVOKE_FUNCTION(request)));
1413        Variant response = GenericMessageFactory::GetInvokeCallFailedError(request);
1414        return SendRTMPMessage(pFrom, response);
1415}
1416
1417bool BaseRTMPAppProtocolHandler::ProcessInvokeResult(BaseRTMPProtocol *pFrom,
1418                Variant & result) {
1419        if (!MAP_HAS2(_resultMessageTracking, pFrom->GetId(), M_INVOKE_ID(result))) {
1420                return true;
1421        }
1422        if (!ProcessInvokeResult(pFrom,
1423                        _resultMessageTracking[pFrom->GetId()][M_INVOKE_ID(result)],
1424                        result)) {
1425                FATAL("Unable to process result");
1426                return false;
1427        }
1428        _resultMessageTracking[pFrom->GetId()].erase(M_INVOKE_ID(result));
1429        return true;
1430}
1431
1432bool BaseRTMPAppProtocolHandler::ProcessInvokeResult(BaseRTMPProtocol *pFrom,
1433                Variant &request, Variant & response) {
1434        string functionName = M_INVOKE_FUNCTION(request);
1435        if (functionName == RM_INVOKE_FUNCTION_CONNECT) {
1436                return ProcessInvokeConnectResult(pFrom, request, response);
1437        } else if (functionName == RM_INVOKE_FUNCTION_CREATESTREAM) {
1438                return ProcessInvokeCreateStreamResult(pFrom, request, response);
1439        } else if (functionName == RM_INVOKE_FUNCTION_FCSUBSCRIBE) {
1440                return ProcessInvokeFCSubscribeResult(pFrom, request, response);
1441        } else if (functionName == RM_INVOKE_FUNCTION_ONBWCHECK) {
1442                return ProcessInvokeOnBWCheckResult(pFrom, request, response);
1443        } else {
1444                return ProcessInvokeGenericResult(pFrom, request, response);
1445        }
1446}
1447
1448bool BaseRTMPAppProtocolHandler::ProcessInvokeConnectResult(BaseRTMPProtocol *pFrom,
1449                Variant &request, Variant & response) {
1450        //1. Do we need to push/pull a stream?
1451        if ((!NeedsToPullExternalStream(pFrom))
1452                        && (!NeedsToPushLocalStream(pFrom))) {
1453                WARN("Default implementation of ProcessInvokeConnectResult: Request:\n%s\nResponse:\n%s",
1454                                STR(request.ToString()),
1455                                STR(response.ToString()));
1456                return true;
1457        }
1458
1459        //2. See if the result is OK or not
1460        if (M_INVOKE_FUNCTION(response) != RM_INVOKE_FUNCTION_RESULT) {
1461                if ((M_INVOKE_FUNCTION(response) != RM_INVOKE_FUNCTION_ERROR)
1462                                || (M_INVOKE_PARAMS(response) != V_MAP)
1463                                || (M_INVOKE_PARAMS(response).MapSize() < 2)
1464                                || (M_INVOKE_PARAM(response, 1) != V_MAP)
1465                                || (!M_INVOKE_PARAM(response, 1).HasKey("level"))
1466                                || (M_INVOKE_PARAM(response, 1)["level"] != V_STRING)
1467                                || (M_INVOKE_PARAM(response, 1)["level"] != "error")
1468                                || (!M_INVOKE_PARAM(response, 1).HasKey("code"))
1469                                || (M_INVOKE_PARAM(response, 1)["code"] != V_STRING)
1470                                || (M_INVOKE_PARAM(response, 1)["code"] != "NetConnection.Connect.Rejected")
1471                                || (!M_INVOKE_PARAM(response, 1).HasKey("description"))
1472                                || (M_INVOKE_PARAM(response, 1)["description"] != V_STRING)
1473                                || (M_INVOKE_PARAM(response, 1)["description"] == "")
1474                                ) {
1475                        FATAL("Connect failed:\n%s", STR(response.ToString()));
1476                        return false;
1477                }
1478                string description = M_INVOKE_PARAM(response, 1)["description"];
1479                vector<string> parts;
1480                split(description, "?", parts);
1481                if (parts.size() != 2) {
1482                        FATAL("Connect failed:\n%s", STR(response.ToString()));
1483                        return false;
1484                }
1485                description = parts[1];
1486                map<string, string> params = mapping(description, "&", "=", true);
1487                if ((!MAP_HAS1(params, "reason"))
1488                                || (!MAP_HAS1(params, "user"))
1489                                || (!MAP_HAS1(params, "salt"))
1490                                || (!MAP_HAS1(params, "challenge"))
1491                                || (params["reason"] != "needauth")
1492                                ) {
1493                        FATAL("Connect failed:\n%s", STR(response.ToString()));
1494                        return false;
1495                }
1496                if (!MAP_HAS1(params, "opaque"))
1497                        params["opaque"] = "";
1498
1499                Variant &customParameters = pFrom->GetCustomParameters();
1500                Variant &streamConfig = NeedsToPullExternalStream(pFrom)
1501                                ? customParameters["customParameters"]["externalStreamConfig"]
1502                                : customParameters["customParameters"]["localStreamConfig"];
1503
1504                FOR_MAP(params, string, string, i) {
1505                        streamConfig["auth"][MAP_KEY(i)] = MAP_VAL(i);
1506                }
1507
1508                return false;
1509        }
1510        if (M_INVOKE_PARAM(response, 1) != V_MAP) {
1511                FATAL("Connect failed:\n%s", STR(response.ToString()));
1512                return false;
1513        }
1514        if (M_INVOKE_PARAM(response, 1)["code"] != V_STRING) {
1515                FATAL("Connect failed:\n%s", STR(response.ToString()));
1516                return false;
1517        }
1518        if (M_INVOKE_PARAM(response, 1)["code"] != "NetConnection.Connect.Success") {
1519                FATAL("Connect failed:\n%s", STR(response.ToString()));
1520                return false;
1521        }
1522
1523        //3. Create the createStream request
1524        Variant createStreamRequest = StreamMessageFactory::GetInvokeCreateStream();
1525
1526        //4. Send it
1527        if (!SendRTMPMessage(pFrom, createStreamRequest, true)) {
1528                FATAL("Unable to send request:\n%s", STR(createStreamRequest.ToString()));
1529                return false;
1530        }
1531
1532        //5. Done
1533        return true;
1534}
1535
1536bool BaseRTMPAppProtocolHandler::ProcessInvokeCreateStreamResult(BaseRTMPProtocol *pFrom,
1537                Variant &request, Variant & response) {
1538        //1. Do we need to push/pull a stream?
1539        if ((!NeedsToPullExternalStream(pFrom))
1540                        && (!NeedsToPushLocalStream(pFrom))) {
1541                WARN("Default implementation of ProcessInvokeCreateStreamResult: Request:\n%s\nResponse:\n%s",
1542                                STR(request.ToString()),
1543                                STR(response.ToString()));
1544                return true;
1545        }
1546
1547        //2. Test and see if this connection is an outbound RTMP connection
1548        //and get a pointer to it
1549        if (pFrom->GetType() != PT_OUTBOUND_RTMP) {
1550                FATAL("This is not an outbound connection");
1551                return false;
1552        }
1553        OutboundRTMPProtocol *pProtocol = (OutboundRTMPProtocol *) pFrom;
1554
1555        //3. Test the response
1556        if (M_INVOKE_FUNCTION(response) != RM_INVOKE_FUNCTION_RESULT) {
1557                FATAL("createStream failed:\n%s", STR(response.ToString()));
1558                return false;
1559        }
1560        if (M_INVOKE_PARAM(response, 1) != _V_NUMERIC) {
1561                FATAL("createStream failed:\n%s", STR(response.ToString()));
1562                return false;
1563        }
1564
1565        //4. Get the assigned stream ID
1566        uint32_t rtmpStreamId = (uint32_t) M_INVOKE_PARAM(response, 1);
1567
1568        //5. Create the neutral stream
1569        RTMPStream *pStream = pProtocol->CreateNeutralStream(rtmpStreamId);
1570        if (pStream == NULL) {
1571                FATAL("Unable to create neutral stream");
1572                return false;
1573        }
1574
1575
1576        //6. Get our hands on streaming parameters
1577        string path = "";
1578        if (NeedsToPullExternalStream(pFrom))
1579                path = "externalStreamConfig";
1580        else
1581                path = "localStreamConfig";
1582        Variant &parameters = pFrom->GetCustomParameters()["customParameters"][path];
1583
1584        //7. Create publish/play request
1585        Variant publishPlayRequest;
1586        Variant fcPublish;
1587        Variant fcSubscribe;
1588        if (NeedsToPullExternalStream(pFrom)) {
1589                publishPlayRequest = StreamMessageFactory::GetInvokePlay(3, rtmpStreamId,
1590                                parameters["uri"]["documentWithFullParameters"], -2, -1);
1591                fcSubscribe = StreamMessageFactory::GetInvokeFCSubscribe(
1592                                parameters["uri"]["documentWithFullParameters"]);
1593        } else {
1594                string targetStreamType = "";
1595                if (parameters["targetStreamType"] == V_STRING) {
1596                        targetStreamType = (string) parameters["targetStreamType"];
1597                }
1598                if ((targetStreamType != "live")
1599                                && (targetStreamType != "record")
1600                                && (targetStreamType != "append")) {
1601                        targetStreamType = "live";
1602                }
1603                publishPlayRequest = StreamMessageFactory::GetInvokePublish(3, rtmpStreamId,
1604                                parameters["targetStreamName"], targetStreamType);
1605                fcPublish = StreamMessageFactory::GetInvokeFCPublish(parameters["targetStreamName"]);
1606        }
1607
1608        //8. Send it
1609        if (fcSubscribe != V_NULL) {
1610                if (!SendRTMPMessage(pFrom, fcSubscribe, false)) {
1611                        FATAL("Unable to send request:\n%s", STR(fcSubscribe.ToString()));
1612                        return false;
1613                }
1614        }
1615        if (!SendRTMPMessage(pFrom, publishPlayRequest, true)) {
1616                FATAL("Unable to send request:\n%s", STR(publishPlayRequest.ToString()));
1617                return false;
1618        }
1619        if (fcPublish != V_NULL) {
1620                if (!SendRTMPMessage(pFrom, fcPublish, false)) {
1621                        FATAL("Unable to send request:\n%s", STR(fcPublish.ToString()));
1622                        return false;
1623                }
1624        }
1625
1626        //9. Done
1627        return true;
1628}
1629
1630bool BaseRTMPAppProtocolHandler::ProcessInvokeFCSubscribeResult(BaseRTMPProtocol *pFrom,
1631                Variant &request, Variant &response) {
1632        return true;
1633}
1634
1635bool BaseRTMPAppProtocolHandler::ProcessInvokeOnBWCheckResult(BaseRTMPProtocol *pFrom,
1636                Variant &request, Variant &response) {
1637        double now;
1638        GETCLOCKS(now);
1639        double startTime = (double) pFrom->GetCustomParameters()["lastOnnBWCheckMessage"];
1640        double totalTime = (now - startTime) / (double) CLOCKS_PER_SECOND;
1641        double speed = (double) ONBWCHECK_SIZE / totalTime / 1024.0 * 8.0;
1642        Variant message = GenericMessageFactory::GetInvokeOnBWDone(speed);
1643        return SendRTMPMessage(pFrom, message);
1644}
1645
1646bool BaseRTMPAppProtocolHandler::ProcessInvokeGenericResult(BaseRTMPProtocol *pFrom,
1647                Variant &request, Variant &response) {
1648        WARN("Invoke result not yet implemented: Request:\n%s\nResponse:\n%s",
1649                        STR(request.ToString()),
1650                        STR(response.ToString()));
1651        return true;
1652}
1653
1654bool BaseRTMPAppProtocolHandler::AuthenticateInboundAdobe(BaseRTMPProtocol *pFrom,
1655                Variant & request, Variant &authState) {
1656        if (!authState.HasKey("stage"))
1657                authState["stage"] = "inProgress";
1658
1659        if (authState["stage"] == "authenticated") {
1660                return true;
1661        }
1662
1663        if (authState["stage"] != "inProgress") {
1664                FATAL("This protocol in not in the authenticating mode");
1665                return false;
1666        }
1667
1668        //1. Validate the type of request
1669        if ((uint8_t) VH_MT(request) != RM_HEADER_MESSAGETYPE_INVOKE) {
1670                FINEST("This is not an invoke. Wait for it...");
1671                return true;
1672        }
1673
1674        //2. Validate the invoke function name
1675        if (M_INVOKE_FUNCTION(request) != RM_INVOKE_FUNCTION_CONNECT) {
1676                FATAL("This is not a connect invoke");
1677                return false;
1678        }
1679
1680        //3. Pick up the first param in the invoke
1681        Variant connectParams = M_INVOKE_PARAM(request, 0);
1682        if (connectParams != V_MAP) {
1683                FATAL("first invoke param must be a map");
1684                return false;
1685        }
1686
1687        //4. pick up the agent name
1688        if ((!connectParams.HasKey(RM_INVOKE_PARAMS_CONNECT_FLASHVER))
1689                        || (connectParams[RM_INVOKE_PARAMS_CONNECT_FLASHVER] != V_STRING)) {
1690                WARN("Incorrect user agent");
1691                authState["stage"] = "authenticated";
1692                authState["canPublish"] = (bool)false;
1693                authState["canOverrideStreamName"] = (bool)false;
1694                return true;
1695        }
1696        string flashVer = (string) connectParams[RM_INVOKE_PARAMS_CONNECT_FLASHVER];
1697
1698        //6. test the flash ver against the allowed encoder agents
1699        if (!_adobeAuthSettings[CONF_APPLICATION_AUTH_ENCODER_AGENTS].HasKey(flashVer)) {
1700                WARN("This agent is not on the list of allowed encoders: `%s`", STR(flashVer));
1701                authState["stage"] = "authenticated";
1702                authState["canPublish"] = (bool)false;
1703                authState["canOverrideStreamName"] = (bool)false;
1704                return true;
1705        }
1706
1707        //7. pick up the tcUrl from the first param
1708        if ((!connectParams.HasKey(RM_INVOKE_PARAMS_CONNECT_APP))
1709                        || (connectParams[RM_INVOKE_PARAMS_CONNECT_APP] != V_STRING)) {
1710                WARN("Incorrect app url");
1711                authState["stage"] = "authenticated";
1712                authState["canPublish"] = (bool)false;
1713                authState["canOverrideStreamName"] = (bool)false;
1714                return true;
1715        }
1716        string appUrl = (string) connectParams[RM_INVOKE_PARAMS_CONNECT_APP];
1717
1718        //8. Split the URI into parts
1719        vector<string> appUrlParts;
1720        split(appUrl, "?", appUrlParts);
1721
1722        //9. Based on the parts count, we are in a specific stage
1723        switch (appUrlParts.size()) {
1724                case 1:
1725                {
1726                        //bare request. We need to tell him that he needs auth
1727                        Variant response = ConnectionMessageFactory::GetInvokeConnectError(request,
1728                                        "[ AccessManager.Reject ] : [ code=403 need auth; authmod=adobe ] : ");
1729                        if (!pFrom->SendMessage(response)) {
1730                                FATAL("Unable to send message");
1731                                return false;
1732                        }
1733
1734                        response = ConnectionMessageFactory::GetInvokeClose();
1735                        if (!pFrom->SendMessage(response)) {
1736                                FATAL("Unable to send message");
1737                                return false;
1738                        }
1739
1740                        pFrom->GracefullyEnqueueForDelete();
1741                        return true;
1742                }
1743                case 2:
1744                {
1745                        map<string, string> params = mapping(appUrlParts[1], "&", "=", false);
1746                        if ((!MAP_HAS1(params, "authmod")) || (!MAP_HAS1(params, "user"))) {
1747                                WARN("Invalid appUrl: %s", STR(appUrl));
1748                                authState["stage"] = "authenticated";
1749                                authState["canPublish"] = (bool)false;
1750                                authState["canOverrideStreamName"] = (bool)false;
1751                                return true;
1752                        }
1753
1754                        string user = params["user"];
1755
1756                        if (MAP_HAS1(params, "challenge")
1757                                        && MAP_HAS1(params, "response")
1758                                        && MAP_HAS1(params, "opaque")) {
1759                                string challenge = params["challenge"];
1760                                string response = params["response"];
1761                                string opaque = params["opaque"];
1762                                string password = GetAuthPassword(user);
1763                                if (password == "") {
1764                                        WARN("No such user: `%s`", STR(user));
1765                                        Variant response = ConnectionMessageFactory::GetInvokeConnectError(request,
1766                                                        "[ AccessManager.Reject ] : [ authmod=adobe ] : ?reason=authfailed&opaque=vgoAAA==");
1767                                        if (!pFrom->SendMessage(response)) {
1768                                                FATAL("Unable to send message");
1769                                                return false;
1770                                        }
1771
1772                                        response = ConnectionMessageFactory::GetInvokeClose();
1773                                        if (!pFrom->SendMessage(response)) {
1774                                                FATAL("Unable to send message");
1775                                                return false;
1776                                        }
1777
1778                                        pFrom->GracefullyEnqueueForDelete();
1779                                        return true;
1780                                }
1781
1782                                string str1 = user + _adobeAuthSalt + password;
1783                                string hash1 = b64(md5(str1, false));
1784                                string str2 = hash1 + opaque + challenge;
1785                                string wanted = b64(md5(str2, false));
1786
1787                                if (response == wanted) {
1788                                        authState["stage"] = "authenticated";
1789                                        authState["canPublish"] = (bool)true;
1790                                        authState["canOverrideStreamName"] = (bool)true;
1791                                        WARN("User `%s` authenticated", STR(user));
1792                                        return true;
1793                                } else {
1794                                        WARN("Invalid password for user `%s`", STR(user));
1795                                        Variant response = ConnectionMessageFactory::GetInvokeConnectError(request,
1796                                                        "[ AccessManager.Reject ] : [ authmod=adobe ] : ?reason=authfailed&opaque=vgoAAA==");
1797                                        if (!pFrom->SendMessage(response)) {
1798                                                FATAL("Unable to send message");
1799                                                return false;
1800                                        }
1801
1802                                        response = ConnectionMessageFactory::GetInvokeClose();
1803                                        if (!pFrom->SendMessage(response)) {
1804                                                FATAL("Unable to send message");
1805                                                return false;
1806                                        }
1807
1808                                        pFrom->GracefullyEnqueueForDelete();
1809                                        return true;
1810                                }
1811                        } else {
1812                                string challenge = generateRandomString(6) + "==";
1813                                string opaque = challenge;
1814                                string description = "[ AccessManager.Reject ] : [ authmod=adobe ] : ?reason=needauth&user=%s&salt=%s&challenge=%s&opaque=%s";
1815
1816                                description = format(description, STR(user), STR(_adobeAuthSalt),
1817                                                STR(challenge), STR(opaque));
1818
1819                                Variant response = ConnectionMessageFactory::GetInvokeConnectError(request, description);
1820                                if (!pFrom->SendMessage(response)) {
1821                                        FATAL("Unable to send message");
1822                                        return false;
1823                                }
1824
1825                                response = ConnectionMessageFactory::GetInvokeClose();
1826                                if (!pFrom->SendMessage(response)) {
1827                                        FATAL("Unable to send message");
1828                                        return false;
1829                                }
1830
1831                                pFrom->GracefullyEnqueueForDelete();
1832                                return true;
1833                        }
1834                }
1835                default:
1836                {
1837                        FATAL("Invalid appUrl: %s", STR(appUrl));
1838                        return false;
1839                }
1840        }
1841}
1842
1843string BaseRTMPAppProtocolHandler::GetAuthPassword(string user) {
1844#ifndef HAS_LUA
1845        ASSERT("Lua is not supported by the current build of the server. Adobe authentication needs lua support");
1846        return "";
1847#endif
1848        string usersFile = _adobeAuthSettings[CONF_APPLICATION_AUTH_USERS_FILE];
1849        string fileName;
1850        string extension;
1851        splitFileName(usersFile, fileName, extension);
1852
1853        double modificationDate = getFileModificationDate(usersFile);
1854        if (modificationDate == 0) {
1855                FATAL("Unable to get last modification date for file %s", STR(usersFile));
1856                return "";
1857        }
1858
1859        if (modificationDate != _lastUsersFileUpdate) {
1860                _users.Reset();
1861                if (!ReadLuaFile(usersFile, "users", _users)) {
1862                        FATAL("Unable to read users file: `%s`", STR(usersFile));
1863                        return "";
1864                }
1865                _lastUsersFileUpdate = modificationDate;
1866        }
1867
1868        if ((VariantType) _users != V_MAP) {
1869                FATAL("Invalid users file: `%s`", STR(usersFile));
1870                return "";
1871        }
1872
1873        if (_users.HasKey(user)) {
1874                if ((VariantType) _users[user] == V_STRING) {
1875                        return _users[user];
1876                } else {
1877                        FATAL("Invalid users file: `%s`", STR(usersFile));
1878                        return "";
1879                }
1880        } else {
1881                FATAL("User `%s` not present in users file: `%s`",
1882                                STR(user),
1883                                STR(usersFile));
1884                return "";
1885        }
1886}
1887
1888Variant BaseRTMPAppProtocolHandler::GetMetaData(string streamName,
1889                bool extractInnerMetadata) {
1890        Variant result;
1891        //1. Store the original request and the keyframe seek flag
1892        result[META_REQUESTED_STREAM_NAME] = streamName;
1893        result[CONF_APPLICATION_KEYFRAMESEEK] = (bool)_keyframeSeek;
1894        result[CONF_APPLICATION_CLIENTSIDEBUFFER] = (int32_t) _clientSideBuffer;
1895        result[CONF_APPLICATION_SEEKGRANULARITY] = _seekGranularity;
1896        result[CONF_APPLICATION_RENAMEBADFILES] = (bool)_renameBadFiles;
1897        result[CONF_APPLICATION_EXTERNSEEKGENERATOR] = (bool)_externSeekGenerator;
1898
1899        //2.Determine the media type
1900        vector<string> parts;
1901        split(streamName, ":", parts);
1902        if (parts.size() != 1 && parts.size() != 2 && parts.size() != 5) {
1903                FATAL("Invalid stream name format: %s", STR(streamName));
1904                return Variant();
1905        }
1906
1907        if (parts.size() == 1) {
1908                result[META_MEDIA_TYPE] = MEDIA_TYPE_LIVE_OR_FLV;
1909        } else {
1910                //some other type
1911                result[META_MEDIA_TYPE] = lowerCase(parts[0]);
1912        }
1913
1914        //3. Establish the final file name we are searching for. If the
1915        //media type is missing, assume we are looking for a flv file
1916        string searchFor = "";
1917        if (result[META_MEDIA_TYPE] == MEDIA_TYPE_LIVE_OR_FLV)
1918                searchFor = parts[0] + ".flv";
1919        else if (result[META_MEDIA_TYPE] == MEDIA_TYPE_MP3)
1920                searchFor = parts[1] + ".mp3";
1921        else
1922                searchFor = parts[1];
1923
1924        result[META_SERVER_FILE_NAME] = searchFor;
1925        result[META_SERVER_MEDIA_DIR] = _mediaFolder;
1926
1927        if (searchFor[0] == PATH_SEPARATOR) {
1928                string mediaFolderNormalizedPath = normalizePath(_mediaFolder, "");
1929                if (searchFor.find(mediaFolderNormalizedPath) == 0) {
1930                        result[META_SERVER_FULL_PATH] = searchFor;
1931                } else {
1932                        result[META_SERVER_FULL_PATH] = "";
1933                }
1934        } else {
1935                result[META_SERVER_FULL_PATH] = normalizePath(_mediaFolder, searchFor);
1936        }
1937
1938        if (!result.HasKey(META_SERVER_FULL_PATH))
1939                result[META_SERVER_FULL_PATH] = "";
1940
1941        //6. Test to see if we need to continue with load/cache the metadata
1942        if (result[META_SERVER_FULL_PATH] == "")
1943                return result;
1944
1945        //7. Load the rest of the metadata from a cache or load it from file and
1946        //cache it after that
1947        string metaPath = (string) result[META_SERVER_FULL_PATH] + "."MEDIA_TYPE_META;
1948        string seekPath = (string) result[META_SERVER_FULL_PATH] + "."MEDIA_TYPE_SEEK;
1949        bool regenerateFiles = true;
1950        if (fileExists(metaPath) && fileExists(seekPath)) {
1951                StreamCapabilities capabilities;
1952                string originalServerFullPath = result[META_SERVER_FULL_PATH];
1953                regenerateFiles =
1954                                (getFileModificationDate(metaPath) < getFileModificationDate(result[META_SERVER_FULL_PATH]))
1955                                || (getFileModificationDate(seekPath) < getFileModificationDate(result[META_SERVER_FULL_PATH]))
1956                                || (!Variant::DeserializeFromXmlFile(metaPath, result))
1957                                || (!StreamCapabilities::Deserialize(seekPath, capabilities));
1958                regenerateFiles |=
1959                                (!result.HasKeyChain(V_STRING, false, 1, META_SERVER_FULL_PATH))
1960                                || (result[META_SERVER_FULL_PATH] != originalServerFullPath)
1961                                || (!result.HasKeyChain(V_BOOL, false, 1, CONF_APPLICATION_KEYFRAMESEEK))
1962                                || ((bool) result[CONF_APPLICATION_KEYFRAMESEEK] != _keyframeSeek)
1963                                || (!result.HasKeyChain(V_INT32, false, 1, CONF_APPLICATION_CLIENTSIDEBUFFER))
1964                                || ((int32_t) result[CONF_APPLICATION_CLIENTSIDEBUFFER] != _clientSideBuffer)
1965                                || (!result.HasKeyChain(V_UINT32, false, 1, CONF_APPLICATION_SEEKGRANULARITY))
1966                                || ((uint32_t) result[CONF_APPLICATION_SEEKGRANULARITY] != _seekGranularity);
1967                if (regenerateFiles) {
1968                        result[META_SERVER_FULL_PATH] = originalServerFullPath;
1969                        result[CONF_APPLICATION_KEYFRAMESEEK] = (bool)_keyframeSeek;
1970                        result[CONF_APPLICATION_CLIENTSIDEBUFFER] = (int32_t) _clientSideBuffer;
1971                        result[CONF_APPLICATION_SEEKGRANULARITY] = _seekGranularity;
1972                }
1973        }
1974
1975        if (!regenerateFiles) {
1976                result[META_REQUESTED_STREAM_NAME] = streamName;
1977                return result;
1978        }
1979
1980        //8. We either have a bad meta file or we don't have it at all. Build it
1981        if (extractInnerMetadata) {
1982                if (!InFileRTMPStream::ResolveCompleteMetadata(result)) {
1983                        FATAL("Unable to get metadata. Partial result:\n%s",
1984                                        STR(result.ToString()));
1985                        return Variant();
1986                }
1987        }
1988
1989        //9. Save it
1990        if (!result.SerializeToXmlFile(metaPath)) {
1991                WARN("Unable to serialize meta file %s. Content was: %s",
1992                                STR(metaPath), STR(result.ToString()));
1993        }
1994        return result;
1995}
1996
1997bool BaseRTMPAppProtocolHandler::OpenClientSharedObject(BaseRTMPProtocol *pFrom,
1998                string soName) {
1999        if (pFrom->GetType() != PT_OUTBOUND_RTMP) {
2000                FATAL("Incorrect protocol type for opening SO");
2001                return false;
2002        }
2003
2004        //1. Check and see if tha SO is already opened
2005        if (pFrom->GetSO(soName) != NULL) {
2006                FATAL("SO already present");
2007                return false;
2008        }
2009
2010        //2. prepare the open command
2011        Variant message = SOMessageFactory::GetSharedObject(3, 0, 0, false, soName,
2012                        1, false);
2013        SOMessageFactory::AddSOPrimitiveConnect(message);
2014
2015        //3. Send it
2016        if (!SendRTMPMessage(pFrom, message)) {
2017                FATAL("Unable to send SO message");
2018                return false;
2019        }
2020
2021        //4. Create the SO
2022        if (!pFrom->CreateSO(soName)) {
2023                FATAL("CreateSO failed");
2024                return false;
2025        }
2026
2027        //5. Done
2028        return true;
2029}
2030
2031bool BaseRTMPAppProtocolHandler::SendRTMPMessage(BaseRTMPProtocol *pTo,
2032                Variant message, bool trackResponse) {
2033        switch ((uint8_t) VH_MT(message)) {
2034                case RM_HEADER_MESSAGETYPE_INVOKE:
2035                {
2036                        if ((M_INVOKE_FUNCTION(message) == RM_INVOKE_FUNCTION_RESULT)
2037                                        || (M_INVOKE_FUNCTION(message) == RM_INVOKE_FUNCTION_ERROR)) {
2038                                return pTo->SendMessage(message);
2039                        } else {
2040                                uint32_t invokeId = 0;
2041                                if (!MAP_HAS1(_nextInvokeId, pTo->GetId())) {
2042                                        FATAL("Unable to get next invoke ID");
2043                                        return false;
2044                                }
2045                                if (trackResponse) {
2046                                        invokeId = _nextInvokeId[pTo->GetId()];
2047                                        _nextInvokeId[pTo->GetId()] = invokeId + 1;
2048                                        M_INVOKE_ID(message) = invokeId;
2049                                        //do not store stupid useless amount of data needed by onbwcheck
2050                                        if (M_INVOKE_FUNCTION(message) == RM_INVOKE_FUNCTION_ONBWCHECK)
2051                                                _resultMessageTracking[pTo->GetId()][invokeId] = _onBWCheckStrippedMessage;
2052                                        else
2053                                                _resultMessageTracking[pTo->GetId()][invokeId] = message;
2054                                } else {
2055                                        M_INVOKE_ID(message) = (uint32_t) 0;
2056                                }
2057                                return pTo->SendMessage(message);
2058                        }
2059                }
2060                case RM_HEADER_MESSAGETYPE_FLEXSTREAMSEND:
2061                case RM_HEADER_MESSAGETYPE_WINACKSIZE:
2062                case RM_HEADER_MESSAGETYPE_PEERBW:
2063                case RM_HEADER_MESSAGETYPE_USRCTRL:
2064                case RM_HEADER_MESSAGETYPE_ABORTMESSAGE:
2065                case RM_HEADER_MESSAGETYPE_SHAREDOBJECT:
2066                {
2067                        return pTo->SendMessage(message);
2068                }
2069                default:
2070                {
2071                        FATAL("Unable to send message:\n%s", STR(message.ToString()));
2072                        return false;
2073                }
2074        }
2075}
2076
2077BaseOutFileStream* BaseRTMPAppProtocolHandler::CreateOutFileStream(
2078                BaseRTMPProtocol *pFrom, Variant &meta, bool append) {
2079        //1. Compute the file name
2080        string fileName = meta[META_SERVER_MEDIA_DIR];
2081        fileName += (string) meta[META_SERVER_FILE_NAME];
2082        FINEST("fileName: %s", STR(fileName));
2083
2084        //2. Delete the old file
2085        if (append) {
2086                WARN("append not supported yet. File will be overwritten");
2087        }
2088        deleteFile(fileName);
2089
2090        if ((meta[META_MEDIA_TYPE] == MEDIA_TYPE_LIVE_OR_FLV) ||
2091                        (meta[META_MEDIA_TYPE] == MEDIA_TYPE_FLV)) {
2092                return new OutFileRTMPFLVStream(pFrom,
2093                                GetApplication()->GetStreamsManager(), fileName);
2094        }
2095        if (meta[META_MEDIA_TYPE] == MEDIA_TYPE_MP4) {
2096                FATAL("Streaming to MP4 file not supported");
2097                return NULL;
2098        }
2099        FATAL("Media type not supported");
2100        return NULL;
2101}
2102
2103InNetRTMPStream *BaseRTMPAppProtocolHandler::CreateInNetStream(
2104                BaseRTMPProtocol *pFrom, uint32_t channelId, uint32_t streamId,
2105                string streamName) {
2106        return new InNetRTMPStream(pFrom,
2107                        GetApplication()->GetStreamsManager(), streamName, streamId, channelId);
2108
2109}
2110
2111string NormalizeStreamName(string streamName) {
2112        replace(streamName, "-", "_");
2113        replace(streamName, "?", "-");
2114        replace(streamName, "&", "-");
2115        replace(streamName, "=", "-");
2116        return streamName;
2117}
2118
2119bool BaseRTMPAppProtocolHandler::TryLinkToLiveStream(BaseRTMPProtocol *pFrom,
2120                uint32_t streamId, string streamName, bool &linked) {
2121        linked = false;
2122
2123        //1. Get get the short version of the stream name
2124        vector<string> parts;
2125        split(streamName, "?", parts);
2126        string shortName = parts[0];
2127
2128        //2. Search for the long version first
2129        map<uint32_t, BaseStream *> inboundStreams =
2130                        GetApplication()->GetStreamsManager()->FindByTypeByName(
2131                        ST_IN_NET, streamName, true, false);
2132
2133        //3. Search for the short version if necessary
2134        if (inboundStreams.size() == 0) {
2135                inboundStreams = GetApplication()->GetStreamsManager()->FindByTypeByName(
2136                                ST_IN_NET, shortName + "?", true, true);
2137        }
2138
2139        //4. Do we have some streams?
2140        if (inboundStreams.size() == 0) {
2141                WARN("No live streams found: `%s` or `%s`", STR(streamName), STR(shortName));
2142                return true;
2143        }
2144
2145        //5. Get the first stream in the inboundStreams
2146        BaseInNetStream *pBaseInNetStream = NULL;
2147
2148        FOR_MAP(inboundStreams, uint32_t, BaseStream *, i) {
2149                BaseInNetStream *pTemp = (BaseInNetStream *) MAP_VAL(i);
2150                if ((!pTemp->IsCompatibleWithType(ST_OUT_NET_RTMP_4_TS))
2151                                && (!pTemp->IsCompatibleWithType(ST_OUT_NET_RTMP_4_RTMP)))
2152                        continue;
2153                pBaseInNetStream = pTemp;
2154                break;
2155        }
2156        if (pBaseInNetStream == NULL) {
2157                WARN("No live streams found: `%s` or `%s`", STR(streamName), STR(shortName));
2158                return true;
2159        }
2160
2161        //6. Create the outbound stream
2162        BaseOutNetRTMPStream * pBaseOutNetRTMPStream = pFrom->CreateONS(streamId,
2163                        streamName, pBaseInNetStream->GetType());
2164        if (pBaseOutNetRTMPStream == NULL) {
2165                FATAL("Unable to create network outbound stream");
2166                return false;
2167        }
2168
2169        //7. Link them
2170        if (!pBaseInNetStream->Link(pBaseOutNetRTMPStream)) {
2171                FATAL("Link failed");
2172                return false;
2173        }
2174
2175        //8. Done
2176        linked = true;
2177        return true;
2178}
2179
2180bool BaseRTMPAppProtocolHandler::TryLinkToFileStream(BaseRTMPProtocol *pFrom,
2181                uint32_t streamId, Variant &metadata, string streamName, double startTime,
2182                double length, bool &linked) {
2183        linked = false;
2184
2185        //1. Try to create the in file streams
2186        InFileRTMPStream *pRTMPInFileStream = pFrom->CreateIFS(metadata);
2187        if (pRTMPInFileStream == NULL) {
2188                WARN("No file streams found: %s", STR(streamName));
2189                return true;
2190        }
2191
2192        //2. Try to create the out net stream
2193        BaseOutNetRTMPStream * pBaseOutNetRTMPStream = pFrom->CreateONS(
2194                        streamId, streamName, pRTMPInFileStream->GetType());
2195        if (pBaseOutNetRTMPStream == NULL) {
2196                FATAL("Unable to create network outbound stream");
2197                return false;
2198        }
2199
2200        //3. Link them
2201        if (!pRTMPInFileStream->Link(pBaseOutNetRTMPStream)) {
2202                FATAL("Link failed");
2203                return false;
2204        }
2205
2206        //4. Register it to the signaled streams
2207        pFrom->SignalONS(pBaseOutNetRTMPStream);
2208
2209        //5. Fire up the play routine
2210        if (!pRTMPInFileStream->Play(startTime, length)) {
2211                FATAL("Unable to start the playback");
2212                return false;
2213        }
2214
2215        //6. Done
2216        linked = true;
2217        return true;
2218}
2219
2220bool BaseRTMPAppProtocolHandler::NeedsToPullExternalStream(
2221                BaseRTMPProtocol *pFrom) {
2222        Variant &parameters = pFrom->GetCustomParameters();
2223        if (parameters != V_MAP)
2224                return false;
2225        if (!parameters.HasKey("customParameters"))
2226                return false;
2227        if (parameters["customParameters"] != V_MAP)
2228                return false;
2229        if (!parameters["customParameters"].HasKey("externalStreamConfig"))
2230                return false;
2231        if (parameters["customParameters"]["externalStreamConfig"] != V_MAP)
2232                return false;
2233        if (!parameters["customParameters"]["externalStreamConfig"].HasKey("uri"))
2234                return false;
2235        if (parameters["customParameters"]["externalStreamConfig"]["uri"] != V_MAP)
2236                return false;
2237        return true;
2238}
2239
2240bool BaseRTMPAppProtocolHandler::NeedsToPushLocalStream(BaseRTMPProtocol *pFrom) {
2241        Variant &parameters = pFrom->GetCustomParameters();
2242        if (parameters != V_MAP)
2243                return false;
2244        if (!parameters.HasKey("customParameters"))
2245                return false;
2246        if (parameters["customParameters"] != V_MAP)
2247                return false;
2248        if (!parameters["customParameters"].HasKey("localStreamConfig"))
2249                return false;
2250        if (parameters["customParameters"]["localStreamConfig"] != V_MAP)
2251                return false;
2252        if (!parameters["customParameters"]["localStreamConfig"].HasKey("targetUri"))
2253                return false;
2254        if (parameters["customParameters"]["localStreamConfig"]["targetUri"] != V_MAP)
2255                return false;
2256        return true;
2257}
2258
2259bool BaseRTMPAppProtocolHandler::PullExternalStream(BaseRTMPProtocol *pFrom) {
2260        //1. Get the stream configuration and the URI from it
2261        Variant &streamConfig = pFrom->GetCustomParameters()["customParameters"]["externalStreamConfig"];
2262
2263        //2. Issue the connect invoke
2264        return ConnectForPullPush(pFrom, "uri", streamConfig, true);
2265}
2266
2267bool BaseRTMPAppProtocolHandler::PushLocalStream(BaseRTMPProtocol *pFrom) {
2268        //1. Get the stream configuration and the URI from it
2269        Variant &streamConfig = pFrom->GetCustomParameters()["customParameters"]["localStreamConfig"];
2270
2271        //2. Issue the connect invoke
2272        return ConnectForPullPush(pFrom, "targetUri", streamConfig, false);
2273}
2274
2275bool BaseRTMPAppProtocolHandler::ConnectForPullPush(BaseRTMPProtocol *pFrom,
2276                string uriPath, Variant &streamConfig, bool isPull) {
2277        URI uri;
2278        if (!URI::FromVariant(streamConfig[uriPath], uri)) {
2279                FATAL("Unable to parse uri:\n%s", STR(streamConfig["targetUri"].ToString()));
2280                return false;
2281        }
2282
2283        //2. get the application name
2284        string appName = "";
2285        if (isPull) {
2286                appName = uri.documentPath();
2287        } else {
2288                appName = uri.fullDocumentPathWithParameters();
2289        }
2290        if (appName != "") {
2291                if (appName[0] == '/')
2292                        appName = appName.substr(1, appName.size() - 1);
2293                if (appName != "") {
2294                        if (appName[appName.size() - 1] == '/')
2295                                appName = appName.substr(0, appName.size() - 1);
2296                }
2297        }
2298        if (appName == "") {
2299                FATAL("Invalid uri: %s", STR(uri.fullUri()));
2300                return false;
2301        }
2302        string authInfo = "";
2303        if (uri.userName() != "") {
2304                if (streamConfig.HasKey("auth")) {
2305                        string user = uri.userName();
2306                        string password = uri.password();
2307                        string salt = streamConfig["auth"]["salt"];
2308                        string opaque = streamConfig["auth"]["opaque"];
2309                        string challenge = streamConfig["auth"]["challenge"];
2310                        string response = "";
2311                        if (opaque == "") {
2312                                string newChallenge = "c3VnaXB1bGFhZG9iZQ==";
2313                                response = b64(md5(b64(md5(user + salt + password, false)) + challenge + newChallenge, false));
2314                                authInfo = "authmod=adobe&user=" + uri.userName()
2315                                                + "&challenge=" + newChallenge
2316                                                + "&response=" + response
2317                                                + "&opaque=";
2318                        } else {
2319                                response = b64(md5(b64(md5(user + salt + password, false)) + opaque + challenge, false));
2320                                authInfo = "authmod=adobe&user=" + uri.userName()
2321                                                + "&challenge=" + challenge
2322                                                + "&opaque=" + opaque
2323                                                + "&salt=" + salt
2324                                                + "&response=" + response;
2325                        }
2326                } else {
2327                        authInfo = "authmod=adobe&user=" + uri.userName();
2328                }
2329        }
2330        if (authInfo != "") {
2331                if (appName.find("?") == string::npos)
2332                        appName += "?" + authInfo;
2333                else
2334                        appName += "&" + authInfo;
2335        }
2336
2337        //4. Get the user agent
2338        string userAgent = "";
2339        if (streamConfig["emulateUserAgent"] == V_STRING) {
2340                userAgent = (string) streamConfig["emulateUserAgent"];
2341        }
2342        if (userAgent == "") {
2343                userAgent = HTTP_HEADERS_SERVER_US;
2344        }
2345
2346        //5. Get swfUrl and pageUrl
2347        string tcUrl = "";
2348        if (streamConfig["tcUrl"] == V_STRING)
2349                tcUrl = (string) streamConfig["tcUrl"];
2350        if (tcUrl == "")
2351                tcUrl = uri.fullUri();
2352        if (authInfo != "") {
2353                if (tcUrl.find("?") == string::npos)
2354                        tcUrl += "?" + authInfo;
2355                else
2356                        tcUrl += "&" + authInfo;
2357        }
2358
2359        string swfUrl = "";
2360        if (streamConfig["swfUrl"] == V_STRING)
2361                swfUrl = (string) streamConfig["swfUrl"];
2362        if (swfUrl == "")
2363                swfUrl = uri.fullUri();
2364        if (authInfo != "") {
2365                if (swfUrl.find("?") == string::npos)
2366                        swfUrl += "?" + authInfo;
2367                else
2368                        swfUrl += "&" + authInfo;
2369        }
2370
2371        string pageUrl = "";
2372        if (streamConfig["pageUrl"] == V_STRING) {
2373                pageUrl = (string) streamConfig["pageUrl"];
2374        }
2375
2376        //6. Prepare the connect request
2377        Variant connectRequest = ConnectionMessageFactory::GetInvokeConnect(
2378                        appName, //string appName
2379                        tcUrl, //string tcUrl
2380                        3191, //double audioCodecs
2381                        239, //double capabilities
2382                        userAgent, //string flashVer
2383                        false, //bool fPad
2384                        pageUrl, //string pageUrl
2385                        swfUrl, //string swfUrl
2386                        252, //double videoCodecs
2387                        1, //double videoFunction
2388                        0 //double objectEncoding
2389                        );
2390
2391        //7. Send it
2392        if (!SendRTMPMessage(pFrom, connectRequest, true)) {
2393                FATAL("Unable to send request:\n%s", STR(connectRequest.ToString()));
2394                return false;
2395        }
2396
2397        return true;
2398}
2399
2400#endif /* HAS_PROTOCOL_RTMP */
Note: See TracBrowser for help on using the repository browser.