source: trunk/Cocoa/F-Script Anywhere/Source/SCPatch/Common/SCPatchMessenger.cp @ 407

Last change on this file since 407 was 407, checked in by Nicholas Riley, 12 years ago

Fix a compiler warning.

File size: 12.8 KB
Line 
1#include <Carbon/Carbon.h>
2#include <sys/time.h>
3#include <syslog.h> 
4#include <unistd.h> 
5#include "SCPatchPrivate.h"
6#include "SCPatchMessenger.h"
7
8//-------------------------------------------------------------------------------------------------------------
9SCPatchMessenger::SCPatchMessenger(void)
10{
11        mThreaded = false;
12        mMessagePort = NULL;
13        mMessageSource = NULL;
14        pthread_mutex_init(&mMessageQueueMutex, NULL);
15        mMessageRetryInterval = DEFAULT_MESSAGE_RETRY_INTERVAL;
16        mMessageRetryLimit = DEFAULT_MESSAGE_RETRY_LIMIT;
17       
18        mMessageQueueDict = CFDictionaryCreateMutable(NULL, 0, &kCFTypeDictionaryKeyCallBacks, NULL);
19        mMessageThreadDict = CFDictionaryCreateMutable(NULL, 0, NULL, &kCFTypeDictionaryValueCallBacks);
20}
21
22//-------------------------------------------------------------------------------------------------------------
23SCPatchMessenger::~SCPatchMessenger(void)
24{
25        CFRelease(mMessageQueueDict);
26        CFRelease(mMessageThreadDict);
27}
28
29//-------------------------------------------------------------------------------------------------------------
30OSErr SCPatchMessenger::StartListening(CFStringRef bundleIdentifier, 
31                                                        Boolean isPatch, Boolean threaded)
32{
33        OSErr                                   err = noErr;
34        CFStringRef                             messagePortName;
35        CFMessagePortContext    messageContext = { 0 };
36        Boolean                                 messageShouldFree;
37        EventLoopRef                    mainLoop;
38       
39        mThreaded = threaded;
40       
41        // Create a port to receive messages
42        messageContext.info = this;
43       
44        if(isPatch)
45                messagePortName = CFStringCreateWithFormat(NULL, NULL, CFSTR("%@.%ld"), bundleIdentifier, getpid());
46        else
47                messagePortName = CFStringCreateCopy(NULL, bundleIdentifier);
48               
49        if(messagePortName == NULL)
50                return memFullErr;
51
52        if(messagePortName) 
53                mMessagePort = CFMessagePortCreateLocal(NULL, messagePortName, ReceiveCallback, &messageContext, &messageShouldFree);
54
55        CFRelease(messagePortName);
56
57        if(mMessagePort == NULL)
58                return destPortErr;
59               
60        // Set up to receive messages from the controller
61        if((mMessageSource = CFMessagePortCreateRunLoopSource(NULL, mMessagePort, 0)) != NULL &&
62           (mainLoop = GetMainEventLoop()) != NULL)
63        {
64                OSErr                   err;
65                AEDesc                  desc;
66                OSType                  sig = 'SCPM';
67
68                CFRunLoopAddSource((CFRunLoopRef)GetCFRunLoopFromEventLoop(mainLoop), mMessageSource, kCFRunLoopCommonModes);
69               
70                // Tell the patch code that it has officially been started.
71                err = AEBuildAppleEvent(kEventClassSCPatchMessenger, kEventSCPatchStart,
72                                                                typeApplSignature, &sig, sizeof(sig),
73                                                                kAutoGenerateReturnID, kAnyTransactionID,
74                                                                &desc, NULL, "");
75                ReceiveMessage(&desc);
76        }
77        else
78        {
79                CFRelease(mMessagePort);
80                mMessagePort = NULL;
81        }
82       
83        return err;
84}
85
86//-------------------------------------------------------------------------------------------------------------                                                 
87OSErr SCPatchMessenger::StopListening(void)
88{
89        EventLoopRef    mainLoop;
90        OSErr                   err;
91        AEDesc                  desc;
92        OSType                  sig = 'SCPM';
93
94        if(mMessageSource && (mainLoop = GetMainEventLoop()) != NULL)
95        {
96                // Tell the patch code that it's about to be stopped.
97                err = AEBuildAppleEvent(kEventClassSCPatchMessenger, kEventSCPatchStop,
98                                                                typeApplSignature, &sig, sizeof(sig),
99                                                                kAutoGenerateReturnID, kAnyTransactionID,
100                                                                &desc, NULL, "");
101                ReceiveMessage(&desc);
102
103                CFRunLoopRemoveSource((CFRunLoopRef)GetCFRunLoopFromEventLoop(mainLoop), mMessageSource, kCFRunLoopCommonModes);
104                CFRelease(mMessageSource);
105                mMessageSource = NULL;
106        }
107       
108        if(mMessagePort)
109        {
110                CFRelease(mMessagePort);
111                mMessagePort = NULL;
112        }
113        return noErr;
114}
115
116//-------------------------------------------------------------------------------------------------------------
117OSErr SCPatchMessenger::SendPing(CFStringRef bundleIdentifier, ProcessSerialNumber *psn)
118{
119        OSErr                   err;
120        AEDesc                  desc;
121        OSType                  sig = 'SCPM';
122
123        err = AEBuildAppleEvent(kSCMessageClass, kSCPing,
124                                                        typeApplSignature, &sig, sizeof(sig),
125                                                        kAutoGenerateReturnID, kAnyTransactionID,
126                                                        &desc, NULL, "");
127
128        if(err == noErr)
129                err = SendMessage(bundleIdentifier, psn, &desc, false);
130
131        return err;
132}
133
134//-------------------------------------------------------------------------------------------------------------
135OSErr SCPatchMessenger::SendMessage(CFStringRef bundleIdentifier,
136                                                ProcessSerialNumber *psn,
137                                                AEEventClass evtClass, AEEventID evtID,
138                                                const char *paramsFmt, ...)
139{
140        OSErr                   err;
141        AEDesc                  desc;
142        va_list                 args;
143        OSType                  sig = 'SCPM';
144
145        va_start(args, paramsFmt);
146        // fprintf(stderr, "Sending message %.4s:%.4s\n", &evtClass, &evtID);
147        err = vAEBuildAppleEvent(evtClass, evtID,
148                                                        typeApplSignature, &sig, sizeof(sig),
149                                                        kAutoGenerateReturnID, kAnyTransactionID,
150                                                        &desc, NULL, paramsFmt, args);
151        va_end(args);
152
153        if(err == noErr)
154                err = SendMessage(bundleIdentifier, psn, &desc, mThreaded);
155       
156        return err;
157}
158
159
160//-------------------------------------------------------------------------------------------------------------
161OSErr SCPatchMessenger::SendMessage(CFStringRef bundleIdentifier, 
162                                                ProcessSerialNumber *psn, 
163                                                const AppleEvent *theAE,
164                                                Boolean threaded)
165{
166        OSErr                           err;
167        pid_t                           pid = 0;
168        AEMessageInfo           *info;
169
170        if(psn && (err = GetProcessPID(psn, &pid)) != noErr)
171                return err;
172       
173        // If we don't succeed or get an error, there's not enough memory
174        err = memFullErr;
175       
176        if((info = (AEMessageInfo *)NewPtrClear(sizeof(AEMessageInfo))) != NULL &&
177           (info->descSize = AEGetDescDataSize(theAE)) > 0 && 
178       (info->descData = NewPtr(info->descSize)) != NULL &&
179       AEGetDescData(theAE, info->descData, info->descSize) == noErr)
180        {
181                if(pid)
182                        info->portName = CFStringCreateWithFormat(NULL, NULL, CFSTR("%@.%ld"), bundleIdentifier, pid);
183                else
184                        info->portName = CFStringCreateCopy(NULL, bundleIdentifier);
185               
186                if(info->portName)
187                {
188                        info->threaded = threaded;
189
190                        if(!threaded)
191                        {
192                                err = SendMessage(info);
193                                DisposePtr(info->descData);
194                                DisposePtr((Ptr)info);
195                        }
196                        else if((err = QueueMessage(info)) != noErr)
197                        {
198                                err = queueFull;
199                        }
200                       
201                }
202        }
203
204        return err;
205}
206
207//-------------------------------------------------------------------------------------------------------------
208OSErr SCPatchMessenger::SendMessage(AEMessageInfo *info)
209{
210        SInt32                          result = kCFMessagePortTransportError;
211        OSErr                           err = noErr;
212        CFDataRef                       dataRef;
213        CFMessagePortRef        portRef;
214
215        if((dataRef = CFDataCreate(NULL, (unsigned char*)info->descData, info->descSize)) != NULL)
216        {
217           if((portRef = CFMessagePortCreateRemote(NULL, info->portName)) != NULL)
218                {
219                        result = CFMessagePortSendRequest(portRef, kSCMessageClass, dataRef, info->threaded ? 0.01 : 5, 0, NULL, NULL);
220                        CFRelease(portRef);
221                }
222                else
223                {
224                        err = destPortErr;
225                }
226                CFRelease(dataRef);
227        }
228        else
229        {
230                err = memFullErr;
231        }
232       
233        // If we're not going to retry, if we've retried too many times, if the message was sent successfully, 
234        // or there was some unrecoverable error like destPortErr (no one's listening) give up and let the
235        // error pass to the thread loop so it can clean up.
236        if(!info->threaded  || info->retryCount >= mMessageRetryLimit || 
237           (err == noErr && result == kCFMessagePortSuccess) || 
238            err == destPortErr || err == memFullErr)
239        {
240                if(err == noErr && result != kCFMessagePortSuccess)
241                        err = (result == kCFMessagePortSendTimeout ? noResponseErr : networkErr);
242#if 0
243                if(err != noErr || result != kCFMessagePortSuccess)
244                        syslog(LOG_ALERT, "%d: Message to %s failed (err = %d result = %d)\n", getpid(), CFStringGetCStringPtr(info->portName, kCFStringEncodingMacRoman), err, result);
245                else
246                        syslog(LOG_ALERT, "%d: Message sent to %s\n", getpid(), CFStringGetCStringPtr(info->portName, kCFStringEncodingMacRoman));
247                       
248#endif
249        }
250        else if(info->threaded) // drop the sending thread's priority and hang around for mMessageRetryInterval
251        {
252                int                                     policy;
253                struct sched_param      param;
254                struct timeval          startTime, endTime;
255               
256                info->retryCount++;
257                pthread_getschedparam(pthread_self(), &policy, &param);
258                param.sched_priority = sched_get_priority_min(policy);
259                pthread_setschedparam(pthread_self(), policy, &param);
260                gettimeofday(&startTime, NULL);
261                for(;;)
262                {
263                        gettimeofday(&endTime, NULL);
264                        if((endTime.tv_sec - startTime.tv_sec) * 1000000L + (endTime.tv_usec - startTime.tv_usec) > 
265                           mMessageRetryInterval * 1000000)
266                        {
267                                // syslog(LOG_ALERT, "%d: waiting to resend on %s (%d)\n", 
268                                           // getpid(), CFStringGetCStringPtr(info->portName, kCFStringEncodingMacRoman), info->retryCount);
269                                break;
270                        }
271                }
272               
273                err = channelBusy;
274        }
275       
276        return err;
277}
278
279//-------------------------------------------------------------------------------------------------------------
280void *SCPatchMessenger::SendThreadEntryPoint(void *infoPtr)
281{
282        SCPatchMessenger        *self = (SCPatchMessenger *)infoPtr;
283        AEMessageInfo           *info;
284        OSStatus                        err = noErr;
285
286        while((info = self->GetNextMessage(pthread_self())) != NULL)
287        {
288                // This will block for mMessageRetryInterval if the send fails
289                err = self->SendMessage(info);
290               
291                // If we've retried too many times, if the message was sent successfully, or if there
292                //  was some unrecoverable error like destPortErr (no one's listening) give up.
293                if(err == noErr || err == destPortErr || err == memFullErr || err == noResponseErr)
294                {
295                        if((info = self->DequeueMessage(pthread_self())) != NULL)
296                        {
297                                CFRelease(info->portName);
298                                DisposePtr(info->descData);
299                                DisposePtr((Ptr)info);
300                        }
301                }
302        }
303       
304        return (void *)err;
305}
306
307//-------------------------------------------------------------------------------------------------------------
308OSErr SCPatchMessenger::QueueMessage(AEMessageInfo *info)
309{
310        CFMutableArrayRef       queue;
311        OSErr                           result = noErr;
312       
313        pthread_mutex_lock(&mMessageQueueMutex);
314       
315        if((queue = (CFMutableArrayRef)CFDictionaryGetValue(mMessageQueueDict, info->portName)) == NULL)
316        {
317                if((queue = CFArrayCreateMutable(NULL, 0, NULL)) != NULL)
318                {
319                        pthread_t       messageThread;
320
321                        CFDictionaryAddValue(mMessageQueueDict, info->portName, queue);
322                        pthread_create(&messageThread, NULL, SendThreadEntryPoint, (void *)this);
323                        pthread_detach(messageThread);
324                        CFDictionaryAddValue(mMessageThreadDict, messageThread, info->portName);
325                }
326        }
327       
328        if(queue)
329                CFArrayAppendValue(queue, info);
330        else
331                result = queueFull;
332
333        pthread_mutex_unlock(&mMessageQueueMutex);
334        return result;
335}
336
337//-------------------------------------------------------------------------------------------------------------
338AEMessageInfo *SCPatchMessenger::GetNextMessage(pthread_t threadID)
339{
340        CFMutableArrayRef       queue;
341        CFStringRef                     queueName;
342        AEMessageInfo           *result = NULL;
343       
344        pthread_mutex_lock(&mMessageQueueMutex);
345
346        if((queueName = (CFStringRef)CFDictionaryGetValue(mMessageThreadDict, threadID)) != NULL)
347                if((queue = (CFMutableArrayRef)CFDictionaryGetValue(mMessageQueueDict, queueName)) != NULL)
348                        if(CFArrayGetCount(queue) > 0)
349                                result = (AEMessageInfo *)CFArrayGetValueAtIndex(queue, 0);
350
351        pthread_mutex_unlock(&mMessageQueueMutex);
352       
353        return result;
354}
355
356//-------------------------------------------------------------------------------------------------------------
357AEMessageInfo *SCPatchMessenger::DequeueMessage(pthread_t threadID)
358{
359        CFMutableArrayRef       queue;
360        CFStringRef                     queueName;
361        Boolean                         stopThread = false;
362        AEMessageInfo           *result = NULL;
363       
364        pthread_mutex_lock(&mMessageQueueMutex);
365
366        if((queueName = (CFStringRef)CFDictionaryGetValue(mMessageThreadDict, threadID)) != NULL &&
367           (queue = (CFMutableArrayRef)CFDictionaryGetValue(mMessageQueueDict, queueName)) != NULL)
368        {
369                if(CFArrayGetCount(queue) > 0)
370                {
371                        result = (AEMessageInfo *)CFArrayGetValueAtIndex(queue, 0);
372                        CFArrayRemoveValueAtIndex(queue, 0);
373                }
374                if(CFArrayGetCount(queue) == 0)
375                {
376                        CFDictionaryRemoveValue(mMessageThreadDict, threadID);
377                        CFDictionaryRemoveValue(mMessageQueueDict, queueName);
378                        stopThread = true;
379                }
380        }
381
382        pthread_mutex_unlock(&mMessageQueueMutex);
383       
384        if(stopThread)
385                pthread_cancel(threadID);
386       
387        return result;
388}
389
390//-------------------------------------------------------------------------------------------------------------
391CFDataRef SCPatchMessenger::ReceiveCallback(CFMessagePortRef local, 
392                                                SInt32 message, 
393                                                CFDataRef dataRef, 
394                                                void *info)
395{
396        #pragma unused(local)
397       
398        SCPatchMessenger        *self = (SCPatchMessenger *)info;
399        Ptr                                     descData = NULL;
400        AEDesc                          desc;
401        long                            length;
402       
403        // fprintf(stderr, "%d: Receiving started\n", getpid());
404        if(message == kSCMessageClass)
405        {
406            if((length = CFDataGetLength(dataRef)) > 0 && (descData = NewPtr(length)) != NULL)
407            {
408                        CFDataGetBytes(dataRef, CFRangeMake(0, length), (unsigned char *)descData);
409
410                        if(AECreateDesc(typeAppleEvent, descData, length, &desc) == noErr)
411                        {
412                                // fprintf(stderr, "%d: handling message\n", getpid());
413                                if (self->HandleMessage(&desc) != noErr)
414                                        self->ReceiveMessage(&desc);
415                                // fprintf(stderr, "%d: done handling message\n", getpid());
416                                AEDisposeDesc(&desc);
417                        }
418                        else
419                        {
420                                fprintf(stderr, "%d: Error %d building AppleEvent", getpid(), memFullErr);
421                        }
422                }
423        }
424
425        if(descData)
426                DisposePtr(descData);
427       
428        // fprintf(stderr, "%d: Receiving finished\n", getpid());
429        return NULL;
430}
431
Note: See TracBrowser for help on using the repository browser.