<?php
    define
('FEEDCACHE''./cache');
    
    class 
FeedMulti {
        
/*
        Provides distributed processing of multiple URL feeds
        */
        
        /* Public read/write */
        
        //URLs to retrieve
        
var $urls = array();
        
        
//Maximum time (in seconds) for feeds, 0 for 'infinite'
        
var $timeout 0;
        var 
$seed false//If seed is 'true' timeout is ignored, so the cache can be 'seeded'
        
        //Cache time for each URL - unset means no cache
        
var $cache_time = array();
        var 
$cache_path FEEDCACHE;
        
        
//Purge can be set to 0 (default is 1) for any feed URL
        //If 0, the file is chmod-ed read-only
        //Any cache clean-up should honor that as much as possible
        //See purge function below (it's a hack, but it works for now)
        
var $purge = array();
        
        
//Chunk size (in bytes) for reading data
        //Defaults to 2048
        
var $chunk_sizes = array();
        
        
//Functions to callback after each chunk of data is read for a URL
        //Should accept 2 arguments: (FeedMulti $this, string $url)
        //Useful for debugging/profiling
        
var $callbacks = array();
        
        
        
/* Public read */
        
        //Results from each URL, indexed by URL
        
var $results = array();
        
        
//Errors from each URL, indexed by URL
        
var $errors = array();
        
        
//Unique ID for any particular instance, useful for debugging. Format may change without notice in future releases
        
var $instance_id 0;
        
        
//Start, end, and elapsed times, indexed by URL, and with instance_id for total times
        
var $start_times = array();
        var 
$end_times = array();
        var 
$elapsed_times = array();
        
        var 
$cache_used = array(); //If a URL was read from cache, this will contain the cache path/filename
        
var $cached = array(); //If URL was succesfully stored in cache, this will contain the cache path/fileanme
        
        /* Private */
        
        //Sockets indexed by URL
        
var $sockets = array();
        
        
//Boolean for which feeds are complete, indexed by URL
        
var $done = array();
        
        function 
__construct(){
            
call_user_func_array(array($this'FeedMulti'), func_get_args());
        }
        
        function 
FeedMulti (){
            
$this->urls func_get_args();
            
//This really should be a cron job, but see notes above func purge...
            
$this->purge();
        }
        
        function 
build_cache_path(){
            if (!
file_exists($this->cache_path)){
                
//PHP 5 has a 'recursive' parameter to mkdir that does all this...
                //find first / leading directory:
                
$subdirs explode('/'$this->cache_path);
                
$dir $this->cache_path[0] == '/' '/' '';
                foreach(
$subdirs as $d){
                    if (
strlen($d)) $dir .= "$d/";
                    if (
strlen($dir) && !file_exists($dir)){
                        
$success mkdir($dir);
                        if (!
$success){
                            
$msg "Unable to create directory $dir for cache (full path: $this->cache_path)";
                            
$this->errors['General'][] = $msg;
                            
error_log($msg);
                            
//might as well quit now
                            
break;
                        }
                    }
                }
            }
        }
        
        
//This just backports microtime(true) from PHP5 to all versions of PHP
        
function mtime(){
            list(
$u$s) = explode(' 'microtime());
            return 
$s $u;
        }
        
        function 
start(){
            global 
$benchmarks;
            global 
$ram;
            
            
//Make sure our cache_path exists:
            
$this->build_cache_path();
            
            
//Generate an ID for this 'search' for tracking/logging/debugging
            
$now $this->mtime();
            
$this->search_id date('Ymd h:i:s'$now) . substr($nowstrpos($now'.'));
            
            
$this->start_times[$this->search_id] = $this->mtime();
            
            
//Open up the sockets and get the engines running for each URL:
            
foreach($this->urls as $url){
                
$parts parse_url($url);
                
$scheme '';
                
$host '';
                
$port '';
                
$user '';
                
$pass '';
                
$path '';
                
$query '';
                
$fragment '';
                
extract($parts);
                
//$scheme, $host, $port, $user, $pass, $path, $query, $fragment
                //This doesn't allow for POST with HTTP, or other methods, which we might need at some point
                //It's really a shame that something that DOES grok all those protocols like fopen won't let one set timeout and non-blocking...
                
                
$this->start_times[$url] = $this->mtime();
                
                
//Currently, only HTTP is implemented
                //check to see if we can use the cache:
                
$cachename md5($url); //MD5 just to avoid goofy symbols in filenames
                //This cache could get insanely large.
                //Use first character to choose a sub-directory to keep number of files per directory sane
                
$cachepath "$this->cache_path/$cachename[0]/$cachename";
                if (isset(
$this->cache_time[$url]) && file_exists($cachepath) && (filemtime($cachepath) >= (time() - $this->cache_time[$url]))){
                    
$this->results[$url] = file_get_contents($cachepath);
                    
#error_log("Read cache: $cachepath");
                    
$this->end_times[$url] = $this->mtime();
                    
$this->done[$url] = true;
                    
$this->cache_used[$url] = $cachepath;
                }
                else switch (
strtolower($scheme)){
                    case 
'http':
                        
//convert http://example.com to http://example.com/
                        
$path strlen($path) ? $path '/'//Default path
                        
$port strlen($port) ? $port 80;//Default HTTP port
                        
                        //This section may be general enough to put into a method as more protocols are added...
                        
                        
$socket = @fsockopen($host$port$errno$error, (($this->timeout && !$this->seed) ? $this->timeout 0xffffffff));
                        if (
$socket === false){
                            
$this->errors[$url][] = "$this->search_id Unable to access $url";
                            if (
$errno){
                                
$this->errors[$url][] = "$this->search_id $errno $error";
                            }
                            
$this->end_times[$url] = $this->mtime();
                        }
                        else{
                            
$non_blocking stream_set_blocking($socket0);
                            if (
$non_blocking == false$this->errors[$url][] = "$this->search_id Unable to make $url non-blocking";
                            if (
$this->timeout && !$this->seed){
                                
$timeout stream_set_timeout($socket$this->timeout);
                                if (
$timeout === false$this->errors[$url][] = "$this->search_id Unable to set $url timeout";
                            }
                            elseif (
$this->seed){
                                
$timeout stream_set_timeout($socket0xffffffff);
                                if (
$timeout === false$this->errors[$url][] = "$this->search_id Unable to force 'seed' timeout of 0xffffffff";
                            }
                            else{
                                
//Perhaps we should also set timeout to 0xffffffff here...
                                //Rather than whatever PHP defaults it to...
                            
}
                            
$this->sockets[$url] = $socket;
                            
                            
//Get the engines running:
                            
$get $path;
                            
$get .= strlen($query) ? "?$query" '';
                            
$get .= strlen($frament) ? "#$fragment" '';
                            
fwrite($socket"GET $get HTTP/1.0\n");
                            
fwrite($socket"Host: $host\n");
                            if (
strlen($user)) fwrite($socket"Authorization: Basic " base64_encode("$user:$pass") . "\r\n");
                            
fwrite($socket"\n");
                        }
                    break;
                    default:
                        
$this->errors[$url][] = "$this->search_id Protocol $scheme is not yet supported";
                    break;
                }
            }
            
            
//Read the data from each feed, as it becomes available and store it in results:
            
$feed_count count($this->urls);
            
//Note that end_times gets set when a feed is done, dead, never opened, or otherwise un-usable
            //done is reserved for feeds that successfully completed to feof()
            //We also 'cheat' and use the total search start time for computing how long to keep going,
            //to avoid the object/array lookup in such a tight loop
            
$s $this->start_times[$this->search_id];
            
$t = ($this->timeout && !$this->seed) ? $this->timeout 0//Here we use 0 for "no timeout"
            
while ((count($this->end_times) < $feed_count) && (!$t || ($this->mtime() < ($s $t)))){
                foreach(
$this->sockets as $url => $socket){
                    if (isset(
$this->end_times[$url])) continue;
                    if (
feof($socket)){
                        
$this->done[$url] = true;
                        
$this->end_times[$url] = $this->mtime();
                        
//for the next iteration, we want to skip this one anyway
                        //since foreach works on a copy of the array, this should work, right?...
                        //only the internal pointer of the original array is getting incremented in parallel, so maybe it's NOT kosher.
                        //unset($this->sockets[$url]);
                        
continue;
                    }
                    
$chunk fread($socket, (isset($chunk_sizes[$url]) ? $chunk_sizes[$url] : 2048));
                    if (
$chunk === false){
                        
$this->errors[$url][] = "$this->search_id Unable to fread $url after byte " strlen($this->results[$url]);
                        
$this->errors[$url][] = "$this->search_id May have partial results for $url";
                        
$this->end_times[$url] = $this->mtime();
                    }
                    else{
                        
$this->results[$url] .= $chunk;
                        if (isset(
$this->callbacks[$url])) call_user_func($this->callbacks[$url], $this$url);
                    }
                }
            }
            
            
$benchmarks['caching results start'] = microtime();
            
$ram['caching results start'] = memory_get_usage();
            
//Any feed that is not "done" should be marked as incomplete:
            //Also, any feed that is done, and could be cached, and the cache is old, cache it
            
clearstatcache(); //Enough time may have elapsed for this to matter...
            
foreach($this->urls as $url){
                
$cachename md5($url); //MD5 just to avoid goofy symbols in filenames
                
$cachepath "$this->cache_path/$cachename[0]/$cachename";
                
#error_log("dirname($cachepath) is " . dirname($cachepath));
                
if (!$this->done[$url]){
                    
$this->errors[$url][] = "Incomplete results from $url";
                    
//At this point, if we have ANYTHING in the cache, it's at least a complete result set, even if it's too old
                    //Better that than an incomplete result, no?
                    
if (isset($this->cache_time[$url]) && file_exists($cachepath)){
                        
$this->results[$url] = file_get_contents($cachepath);
                        
$this->errors[$url][] = "Chose to load obsolete cache from $cachepath in favor of incomplete results";
                    }
                }
                elseif (isset(
$this->cache_time[$url]) && (!file_exists($cachepath) || (filemtime($cachepath) < time() - ($this->cache_time[$url])))){
                    
//cache it
                    
if (!file_exists(dirname($cachepath))) mkdir(dirname($cachepath)) or error_log("Failed to mkdir " dirname($cachepath));
                    
chmod(dirname($cachepath), 0775);
                    
//If we made it read-only previously, make it possible for use to write the new data
                    
if (file_exists($cachepath)) chmod($cachepath0664);
                    
$f fopen($cachepath'w') or error_log("Failed to open $cachepath");
                    
#error_log("opened $cachepath for writing: $f");
                    
if ($f){
                        
$bytes fwrite($f$this->results[$url]);
                        
#error_log("Wrote $bytes bytes to $cachepath");
                        
if ($bytes == strlen($this->results[$url])){
                            
$this->cached[$url] = $cachepath;
                            
chmod($cachepath0664);
                        }
                        else{
                            
$this->errors[$url][] = "Only wrote $bytes bytes of " strlen($this->results[$url]) . " to cache.";
                            
unlink($cachepath);
                        }
                        
fclose($f);
                        
//Make it non-purgable if requested:
                        
if (isset($this->purge[$url]) && !$this->purge[$url]){
                            
chmod($cachepath0444);
                        }
                    }
                    else{
                        
$this->errors[$url][] = "Failed to open $cachepath for writing cache.";
                    }
                }
                
//Compute elapsed time for each feed:
                
$this->elapsed_times[$url] = $this->end_times[$url] - $this->start_times[$url];
            }
            
$benchmarks['caching results end'] = microtime();
            
$ram['caching results end'] = memory_get_usage();
            
            
$this->end_times[$this->search_id] = $this->mtime();
            
$this->elapsed_times[$this->search_id] = $this->end_times[$this->search_id] - $this->start_times[$this->search_id];
            
        }
        
        
//In an ideal world, this would be done by a cron job
        //But it should be run by user www, and I can't do that
        //So we do this rather hokey hack here with a lock file
        
function purge (){
            
$lock_path $this->cache_path '/purge.lock';
            
//purge time is hard-coded here and below to 2 hours. But it's done on constructor, so can't have a member variable for it...
            
if (!file_exists($lock_path) || (filemtime($lock_path) < (time() - 60*60*2))){
                
touch($lock_path);
                
$dir opendir($this->cache_path);
                if (!
$dir$this->errors[][] = "Unable to opendir($this->cache_path)";
                while ((
$file readdir($dir)) !== false){
                    if (
$file != '.' && $file != '..' && $file != 'purge.lock'){
                        
$dir2 opendir("$this->cache_path/$file");
                        if (!
$dir2$this->errors[][] = "Unable to opendir($this->cache_path/$file)";
                        while (
$file2 readdir($dir2)){
                            if (
$file2 != '.' && $file2 != '..'){
                                
$full "$this->cache_path/$file/$file2";
                                if ((
filemtime($full) < (time() - 60*60*2)) && is_writable($full)){
                                    
$success unlink($full);
                                    if (
$success$this->purged[] = $full;
                                    else 
$this->errors[][] = "Failed to purge: $file";
                                }
                            }
                        }
                    }
                }
            }
        }
    }
    
    
    
/* Some testing, to be commented in/out */
    
    //
    
echo "<PRE>";
    
//TEST #1:
    //A test using 3 high-volume, high-availability, well-known URLs:
    
$multi = new FeedMulti('http://www.php.net/''http://www.mysql.com/''http://www.apache.org/''http://www.kernel.org');
    
    function 
progress_report($multi$url){
        echo 
$multi->mtime(), " $url\t"strlen($multi->results[$url]), "\n";
    }
    
    foreach(
$multi->urls as $url){
        
//Un/comment this to watch/hide the wheels spinning in parallel:
        
$multi->callbacks[$url] = 'progress_report';
    }
    
    
$multi->timeout 10//10 seconds seems generous
    
$multi->start();
    
    
var_dump($multi->urls);
    echo 
"\nERRORS:\n"var_dump($multi->errors);
    
//Un/comment to see/hide the actual HTTP results
    //echo "\nRESULTS:\n"; var_dump($multi->results);
    //echo "\nPARALLEL TIMES:\n"; var_dump($multi->elapsed_times);
    
    
$parallel $multi->elapsed_times[$multi->search_id];
    echo 
"\n  PARALLEL SEARCH ELAPSED TIME: $parallel\n";
    
    
    
$start $multi->mtime();
    
$php file_get_contents('http://www.php.net/');
    
$mysql file_get_contents('http://www.mysql.com/');
    
$apache file_get_contents('http://www.apache.org/');
    
$end $multi->mtime();
    
$sequential $end $start;
    echo 
"\nSEQUENTIAL SEARCH ELAPSED TIME: $sequential\n";
    
    
$percent round(100 * ($sequential $parallel)/max($parallel$sequential), 2);
    
$gain_loss $parallel $sequential 'loss' 'gain';
    echo 
"\nBOTTOM LINE: $percent% $gain_loss\n";
    
    echo 
"</PRE>";
    
//
?>