source: trunk/Cocoa/F-Script Anywhere/Source/SCPatch/Common/SCPatchMessenger.cp @ 153

Last change on this file since 153 was 153, checked in by Nicholas Riley, 16 years ago

Integrates SCPatch and mach_inject; unfinished, buggy.

File size: 12.7 KB
Line 
1#include <Carbon/Carbon.h>
2#include <syslog.h> 
3#include <unistd.h> 
4#include "SCPatchPrivate.h"
5#include "SCPatchMessenger.h"
6
7//-------------------------------------------------------------------------------------------------------------
8SCPatchMessenger::SCPatchMessenger(void)
9{
10        mThreaded = false;
11        mMessagePort = NULL;
12        mMessageSource = NULL;
13        pthread_mutex_init(&mMessageQueueMutex, NULL);
14        mMessageRetryInterval = DEFAULT_MESSAGE_RETRY_INTERVAL;
15        mMessageRetryLimit = DEFAULT_MESSAGE_RETRY_LIMIT;
16       
17        mMessageQueueDict = CFDictionaryCreateMutable(NULL, 0, &kCFTypeDictionaryKeyCallBacks, NULL);
18        mMessageThreadDict = CFDictionaryCreateMutable(NULL, 0, NULL, &kCFTypeDictionaryValueCallBacks);
19}
20
21//-------------------------------------------------------------------------------------------------------------
22SCPatchMessenger::~SCPatchMessenger(void)
23{
24        CFRelease(mMessageQueueDict);
25        CFRelease(mMessageThreadDict);
26}
27
28//-------------------------------------------------------------------------------------------------------------
29OSErr SCPatchMessenger::StartListening(CFStringRef bundleIdentifier, 
30                                                        Boolean isPatch, Boolean threaded)
31{
32        OSErr                                   err = noErr;
33        CFStringRef                             messagePortName;
34        CFMessagePortContext    messageContext = { 0 };
35        Boolean                                 messageShouldFree;
36        EventLoopRef                    mainLoop;
37       
38        mThreaded = threaded;
39       
40        // Create a port to receive messages
41        messageContext.info = this;
42       
43        if(isPatch)
44                messagePortName = CFStringCreateWithFormat(NULL, NULL, CFSTR("%@.%ld"), bundleIdentifier, getpid());
45        else
46                messagePortName = CFStringCreateCopy(NULL, bundleIdentifier);
47               
48        if(messagePortName == NULL)
49                return memFullErr;
50
51        if(messagePortName) 
52                mMessagePort = CFMessagePortCreateLocal(NULL, messagePortName, ReceiveCallback, &messageContext, &messageShouldFree);
53
54        CFRelease(messagePortName);
55
56        if(mMessagePort == NULL)
57                return destPortErr;
58               
59        // Set up to receive messages from the controller
60        if((mMessageSource = CFMessagePortCreateRunLoopSource(NULL, mMessagePort, 0)) != NULL &&
61           (mainLoop = GetMainEventLoop()) != NULL)
62        {
63                OSErr                   err;
64                AEDesc                  desc;
65                OSType                  sig = 'SCPM';
66
67                CFRunLoopAddSource((CFRunLoopRef)GetCFRunLoopFromEventLoop(mainLoop), mMessageSource, kCFRunLoopCommonModes);
68               
69                // Tell the patch code that it has officially been started.
70                err = AEBuildAppleEvent(kEventClassSCPatchMessenger, kEventSCPatchStart,
71                                                                typeApplSignature, &sig, sizeof(sig),
72                                                                kAutoGenerateReturnID, kAnyTransactionID,
73                                                                &desc, NULL, "");
74                ReceiveMessage(&desc);
75        }
76        else
77        {
78                CFRelease(mMessagePort);
79                mMessagePort = NULL;
80        }
81       
82        return err;
83}
84
85//-------------------------------------------------------------------------------------------------------------                                                 
86OSErr SCPatchMessenger::StopListening(void)
87{
88        EventLoopRef    mainLoop;
89        OSErr                   err;
90        AEDesc                  desc;
91        OSType                  sig = 'SCPM';
92
93        if(mMessageSource && (mainLoop = GetMainEventLoop()) != NULL)
94        {
95                // Tell the patch code that it's about to be stopped.
96                err = AEBuildAppleEvent(kEventClassSCPatchMessenger, kEventSCPatchStop,
97                                                                typeApplSignature, &sig, sizeof(sig),
98                                                                kAutoGenerateReturnID, kAnyTransactionID,
99                                                                &desc, NULL, "");
100                ReceiveMessage(&desc);
101
102                CFRunLoopRemoveSource((CFRunLoopRef)GetCFRunLoopFromEventLoop(mainLoop), mMessageSource, kCFRunLoopCommonModes);
103                CFRelease(mMessageSource);
104                mMessageSource = NULL;
105        }
106       
107        if(mMessagePort)
108        {
109                CFRelease(mMessagePort);
110                mMessagePort = NULL;
111        }
112        return noErr;
113}
114
115//-------------------------------------------------------------------------------------------------------------
116OSErr SCPatchMessenger::SendPing(CFStringRef bundleIdentifier, ProcessSerialNumber *psn)
117{
118        OSErr                   err;
119        AEDesc                  desc;
120        OSType                  sig = 'SCPM';
121
122        err = AEBuildAppleEvent(kSCMessageClass, kSCPing,
123                                                        typeApplSignature, &sig, sizeof(sig),
124                                                        kAutoGenerateReturnID, kAnyTransactionID,
125                                                        &desc, NULL, "");
126
127        if(err == noErr)
128                err = SendMessage(bundleIdentifier, psn, &desc, false);
129
130        return err;
131}
132
133//-------------------------------------------------------------------------------------------------------------
134OSErr SCPatchMessenger::SendMessage(CFStringRef bundleIdentifier,
135                                                ProcessSerialNumber *psn,
136                                                AEEventClass evtClass, AEEventID evtID,
137                                                const char *paramsFmt, ...)
138{
139        OSErr                   err;
140        AEDesc                  desc;
141        va_list                 args;
142        OSType                  sig = 'SCPM';
143
144        va_start(args, paramsFmt);
145        // fprintf(stderr, "Sending message %.4s:%.4s\n", &evtClass, &evtID);
146        err = vAEBuildAppleEvent(evtClass, evtID,
147                                                        typeApplSignature, &sig, sizeof(sig),
148                                                        kAutoGenerateReturnID, kAnyTransactionID,
149                                                        &desc, NULL, paramsFmt, args);
150        va_end(args);
151
152        if(err == noErr)
153                err = SendMessage(bundleIdentifier, psn, &desc, mThreaded);
154       
155        return err;
156}
157
158
159//-------------------------------------------------------------------------------------------------------------
160OSErr SCPatchMessenger::SendMessage(CFStringRef bundleIdentifier, 
161                                                ProcessSerialNumber *psn, 
162                                                const AppleEvent *theAE,
163                                                Boolean threaded)
164{
165        OSErr                           err;
166        pid_t                           pid = 0;
167        AEMessageInfo           *info;
168
169        if(psn && (err = GetProcessPID(psn, &pid)) != noErr)
170                return err;
171       
172        // If we don't succeed or get an error, there's not enough memory
173        err = memFullErr;
174       
175        if((info = (AEMessageInfo *)NewPtrClear(sizeof(AEMessageInfo))) != NULL &&
176           (info->descSize = AEGetDescDataSize(theAE)) > 0 && 
177       (info->descData = NewPtr(info->descSize)) != NULL &&
178       AEGetDescData(theAE, info->descData, info->descSize) == noErr)
179        {
180                if(pid)
181                        info->portName = CFStringCreateWithFormat(NULL, NULL, CFSTR("%@.%ld"), bundleIdentifier, pid);
182                else
183                        info->portName = CFStringCreateCopy(NULL, bundleIdentifier);
184               
185                if(info->portName)
186                {
187                        info->threaded = threaded;
188
189                        if(!threaded)
190                        {
191                                err = SendMessage(info);
192                                DisposePtr(info->descData);
193                                DisposePtr((Ptr)info);
194                        }
195                        else if((err = QueueMessage(info)) != noErr)
196                        {
197                                err = queueFull;
198                        }
199                       
200                }
201        }
202
203        return err;
204}
205
206//-------------------------------------------------------------------------------------------------------------
207OSErr SCPatchMessenger::SendMessage(AEMessageInfo *info)
208{
209        SInt32                          result = kCFMessagePortTransportError;
210        OSErr                           err = noErr;
211        CFDataRef                       dataRef;
212        CFMessagePortRef        portRef;
213
214        if((dataRef = CFDataCreate(NULL, (unsigned char*)info->descData, info->descSize)) != NULL)
215        {
216           if((portRef = CFMessagePortCreateRemote(NULL, info->portName)) != NULL)
217                {
218                        result = CFMessagePortSendRequest(portRef, kSCMessageClass, dataRef, info->threaded ? 0.01 : 5, 0, NULL, NULL);
219                        CFRelease(portRef);
220                }
221                else
222                {
223                        err = destPortErr;
224                }
225                CFRelease(dataRef);
226        }
227        else
228        {
229                err = memFullErr;
230        }
231       
232        // If we're not going to retry, if we've retried too many times, if the message was sent successfully, 
233        // or there was some unrecoverable error like destPortErr (no one's listening) give up and let the
234        // error pass to the thread loop so it can clean up.
235        if(!info->threaded  || info->retryCount >= mMessageRetryLimit || 
236           (err == noErr && result == kCFMessagePortSuccess) || 
237            err == destPortErr || err == memFullErr)
238        {
239                if(err == noErr && result != kCFMessagePortSuccess)
240                        err = (result == kCFMessagePortSendTimeout ? noResponseErr : networkErr);
241#if 0
242                if(err != noErr || result != kCFMessagePortSuccess)
243                        syslog(LOG_ALERT, "%d: Message to %s failed (err = %d result = %d)\n", getpid(), CFStringGetCStringPtr(info->portName, kCFStringEncodingMacRoman), err, result);
244                else
245                        syslog(LOG_ALERT, "%d: Message sent to %s\n", getpid(), CFStringGetCStringPtr(info->portName, kCFStringEncodingMacRoman));
246                       
247#endif
248        }
249        else if(info->threaded) // drop the sending thread's priority and hang around for mMessageRetryInterval
250        {
251                int                                     policy;
252                struct sched_param      param;
253                struct timeval          startTime, endTime;
254               
255                info->retryCount++;
256                pthread_getschedparam(pthread_self(), &policy, &param);
257                param.sched_priority = sched_get_priority_min(policy);
258                pthread_setschedparam(pthread_self(), policy, &param);
259                gettimeofday(&startTime, NULL);
260                for(;;)
261                {
262                        gettimeofday(&endTime, NULL);
263                        if((endTime.tv_sec - startTime.tv_sec) * 1000000L + (endTime.tv_usec - startTime.tv_usec) > 
264                           mMessageRetryInterval * 1000000)
265                        {
266                                // syslog(LOG_ALERT, "%d: waiting to resend on %s (%d)\n", 
267                                           // getpid(), CFStringGetCStringPtr(info->portName, kCFStringEncodingMacRoman), info->retryCount);
268                                break;
269                        }
270                }
271               
272                err = channelBusy;
273        }
274       
275        return err;
276}
277
278//-------------------------------------------------------------------------------------------------------------
279void *SCPatchMessenger::SendThreadEntryPoint(void *infoPtr)
280{
281        SCPatchMessenger        *self = (SCPatchMessenger *)infoPtr;
282        AEMessageInfo           *info;
283        OSStatus                        err;
284
285        while((info = self->GetNextMessage(pthread_self())) != NULL)
286        {
287                // This will block for mMessageRetryInterval if the send fails
288                err = self->SendMessage(info);
289               
290                // If we've retried too many times, if the message was sent successfully, or if there
291                //  was some unrecoverable error like destPortErr (no one's listening) give up.
292                if(err == noErr || err == destPortErr || err == memFullErr || err == noResponseErr)
293                {
294                        if((info = self->DequeueMessage(pthread_self())) != NULL)
295                        {
296                                CFRelease(info->portName);
297                                DisposePtr(info->descData);
298                                DisposePtr((Ptr)info);
299                        }
300                }
301        }
302       
303        return (void *)err;
304}
305
306//-------------------------------------------------------------------------------------------------------------
307OSErr SCPatchMessenger::QueueMessage(AEMessageInfo *info)
308{
309        CFMutableArrayRef       queue;
310        OSErr                           result = noErr;
311       
312        pthread_mutex_lock(&mMessageQueueMutex);
313       
314        if((queue = (CFMutableArrayRef)CFDictionaryGetValue(mMessageQueueDict, info->portName)) == NULL)
315        {
316                if((queue = CFArrayCreateMutable(NULL, 0, NULL)) != NULL)
317                {
318                        pthread_t       messageThread;
319
320                        CFDictionaryAddValue(mMessageQueueDict, info->portName, queue);
321                        pthread_create(&messageThread, NULL, SendThreadEntryPoint, (void *)this);
322                        pthread_detach(messageThread);
323                        CFDictionaryAddValue(mMessageThreadDict, messageThread, info->portName);
324                }
325        }
326       
327        if(queue)
328                CFArrayAppendValue(queue, info);
329        else
330                result = queueFull;
331
332        pthread_mutex_unlock(&mMessageQueueMutex);
333        return result;
334}
335
336//-------------------------------------------------------------------------------------------------------------
337AEMessageInfo *SCPatchMessenger::GetNextMessage(pthread_t threadID)
338{
339        CFMutableArrayRef       queue;
340        CFStringRef                     queueName;
341        AEMessageInfo           *result = NULL;
342       
343        pthread_mutex_lock(&mMessageQueueMutex);
344
345        if((queueName = (CFStringRef)CFDictionaryGetValue(mMessageThreadDict, threadID)) != NULL)
346                if((queue = (CFMutableArrayRef)CFDictionaryGetValue(mMessageQueueDict, queueName)) != NULL)
347                        if(CFArrayGetCount(queue) > 0)
348                                result = (AEMessageInfo *)CFArrayGetValueAtIndex(queue, 0);
349
350        pthread_mutex_unlock(&mMessageQueueMutex);
351       
352        return result;
353}
354
355//-------------------------------------------------------------------------------------------------------------
356AEMessageInfo *SCPatchMessenger::DequeueMessage(pthread_t threadID)
357{
358        CFMutableArrayRef       queue;
359        CFStringRef                     queueName;
360        Boolean                         stopThread = false;
361        AEMessageInfo           *result = NULL;
362       
363        pthread_mutex_lock(&mMessageQueueMutex);
364
365        if((queueName = (CFStringRef)CFDictionaryGetValue(mMessageThreadDict, threadID)) != NULL &&
366           (queue = (CFMutableArrayRef)CFDictionaryGetValue(mMessageQueueDict, queueName)) != NULL)
367        {
368                if(CFArrayGetCount(queue) > 0)
369                {
370                        result = (AEMessageInfo *)CFArrayGetValueAtIndex(queue, 0);
371                        CFArrayRemoveValueAtIndex(queue, 0);
372                }
373                if(CFArrayGetCount(queue) == 0)
374                {
375                        CFDictionaryRemoveValue(mMessageThreadDict, threadID);
376                        CFDictionaryRemoveValue(mMessageQueueDict, queueName);
377                        stopThread = true;
378                }
379        }
380
381        pthread_mutex_unlock(&mMessageQueueMutex);
382       
383        if(stopThread)
384                pthread_cancel(threadID);
385       
386        return result;
387}
388
389//-------------------------------------------------------------------------------------------------------------
390CFDataRef SCPatchMessenger::ReceiveCallback(CFMessagePortRef local, 
391                                                SInt32 message, 
392                                                CFDataRef dataRef, 
393                                                void *info)
394{
395        #pragma unused(local)
396       
397        SCPatchMessenger        *self = (SCPatchMessenger *)info;
398        Ptr                                     descData = NULL;
399        AEDesc                          desc;
400        long                            length;
401       
402        // fprintf(stderr, "%d: Receiving started\n", getpid());
403        if(message == kSCMessageClass)
404        {
405            if((length = CFDataGetLength(dataRef)) > 0 && (descData = NewPtr(length)) != NULL)
406            {
407                        CFDataGetBytes(dataRef, CFRangeMake(0, length), (unsigned char *)descData);
408
409                        if(AECreateDesc(typeAppleEvent, descData, length, &desc) == noErr)
410                        {
411                                // fprintf(stderr, "%d: handling message\n", getpid());
412                                if (self->HandleMessage(&desc) != noErr)
413                                        self->ReceiveMessage(&desc);
414                                // fprintf(stderr, "%d: done handling message\n", getpid());
415                                AEDisposeDesc(&desc);
416                        }
417                        else
418                        {
419                                fprintf(stderr, "%d: Error %d building AppleEvent", getpid(), memFullErr);
420                        }
421                }
422        }
423
424        if(descData)
425                DisposePtr(descData);
426       
427        // fprintf(stderr, "%d: Receiving finished\n", getpid());
428        return NULL;
429}
430
Note: See TracBrowser for help on using the repository browser.