TYPO3 API  SVNRelease
StreamBuffer.php
Go to the documentation of this file.
00001 <?php
00002 
00003 /*
00004  * This file is part of SwiftMailer.
00005  * (c) 2004-2009 Chris Corbyn
00006  *
00007  * For the full copyright and license information, please view the LICENSE
00008  * file that was distributed with this source code.
00009  */
00010 
00011 //@require 'Swift/ByteStream/AbstractFilterableInputStream.php';
00012 //@require 'Swift/ReplacementFilterFactory.php';
00013 //@require 'Swift/Transport/IoBuffer.php';
00014 //@require 'Swift/TransportException.php';
00015 
00016 /**
00017  * A generic IoBuffer implementation supporting remote sockets and local processes.
00018  * @package Swift
00019  * @subpackage Transport
00020  * @author Chris Corbyn
00021  */
00022 class Swift_Transport_StreamBuffer
00023   extends Swift_ByteStream_AbstractFilterableInputStream
00024   implements Swift_Transport_IoBuffer
00025 {
00026 
00027   /** A primary socket */
00028   private $_stream;
00029 
00030   /** The input stream */
00031   private $_in;
00032 
00033   /** The output stream */
00034   private $_out;
00035 
00036   /** Buffer initialization parameters */
00037   private $_params = array();
00038 
00039   /** The ReplacementFilterFactory */
00040   private $_replacementFactory;
00041 
00042   /** Translations performed on data being streamed into the buffer */
00043   private $_translations = array();
00044 
00045   /**
00046    * Create a new StreamBuffer using $replacementFactory for transformations.
00047    * @param Swift_ReplacementFilterFactory $replacementFactory
00048    */
00049   public function __construct(
00050     Swift_ReplacementFilterFactory $replacementFactory)
00051   {
00052     $this->_replacementFactory = $replacementFactory;
00053   }
00054 
00055   /**
00056    * Perform any initialization needed, using the given $params.
00057    * Parameters will vary depending upon the type of IoBuffer used.
00058    * @param array $params
00059    */
00060   public function initialize(array $params)
00061   {
00062     $this->_params = $params;
00063     switch ($params['type'])
00064     {
00065       case self::TYPE_PROCESS:
00066         $this->_establishProcessConnection();
00067         break;
00068       case self::TYPE_SOCKET:
00069       default:
00070         $this->_establishSocketConnection();
00071         break;
00072     }
00073   }
00074 
00075   /**
00076    * Set an individual param on the buffer (e.g. switching to SSL).
00077    * @param string $param
00078    * @param mixed $value
00079    */
00080   public function setParam($param, $value)
00081   {
00082     if (isset($this->_stream))
00083     {
00084       switch ($param)
00085       {
00086         case 'protocol':
00087           if (!array_key_exists('protocol', $this->_params)
00088             || $value != $this->_params['protocol'])
00089           {
00090             if ('tls' == $value)
00091             {
00092               stream_socket_enable_crypto(
00093                 $this->_stream, true, STREAM_CRYPTO_METHOD_TLS_CLIENT
00094                 );
00095             }
00096           }
00097           break;
00098       }
00099     }
00100     $this->_params[$param] = $value;
00101   }
00102 
00103   /**
00104    * Perform any shutdown logic needed.
00105    */
00106   public function terminate()
00107   {
00108     if (isset($this->_stream))
00109     {
00110       switch ($this->_params['type'])
00111       {
00112         case self::TYPE_PROCESS:
00113           fclose($this->_in);
00114           fclose($this->_out);
00115           proc_close($this->_stream);
00116           break;
00117         case self::TYPE_SOCKET:
00118         default:
00119           fclose($this->_stream);
00120           break;
00121       }
00122     }
00123     $this->_stream = null;
00124     $this->_out = null;
00125     $this->_in = null;
00126   }
00127 
00128   /**
00129    * Set an array of string replacements which should be made on data written
00130    * to the buffer.  This could replace LF with CRLF for example.
00131    * @param string[] $replacements
00132    */
00133   public function setWriteTranslations(array $replacements)
00134   {
00135     foreach ($this->_translations as $search => $replace)
00136     {
00137       if (!isset($replacements[$search]))
00138       {
00139         $this->removeFilter($search);
00140         unset($this->_translations[$search]);
00141       }
00142     }
00143 
00144     foreach ($replacements as $search => $replace)
00145     {
00146       if (!isset($this->_translations[$search]))
00147       {
00148         $this->addFilter(
00149           $this->_replacementFactory->createFilter($search, $replace), $search
00150           );
00151         $this->_translations[$search] = true;
00152       }
00153     }
00154   }
00155 
00156   /**
00157    * Get a line of output (including any CRLF).
00158    * The $sequence number comes from any writes and may or may not be used
00159    * depending upon the implementation.
00160    * @param int $sequence of last write to scan from
00161    * @return string
00162    */
00163   public function readLine($sequence)
00164   {
00165     if (isset($this->_out) && !feof($this->_out))
00166     {
00167       $line = fgets($this->_out);
00168       return $line;
00169     }
00170   }
00171 
00172   /**
00173    * Reads $length bytes from the stream into a string and moves the pointer
00174    * through the stream by $length. If less bytes exist than are requested the
00175    * remaining bytes are given instead. If no bytes are remaining at all, boolean
00176    * false is returned.
00177    * @param int $length
00178    * @return string
00179    */
00180   public function read($length)
00181   {
00182     if (isset($this->_out) && !feof($this->_out))
00183     {
00184       $ret = fread($this->_out, $length);
00185       return $ret;
00186     }
00187   }
00188 
00189   /** Not implemented */
00190   public function setReadPointer($byteOffset)
00191   {
00192   }
00193 
00194   // -- Protected methods
00195 
00196   /** Flush the stream contents */
00197   protected function _flush()
00198   {
00199     if (isset($this->_in))
00200     {
00201       fflush($this->_in);
00202     }
00203   }
00204 
00205   /** Write this bytes to the stream */
00206   protected function _commit($bytes)
00207   {
00208     if (isset($this->_in)
00209       && fwrite($this->_in, $bytes))
00210     {
00211       return ++$this->_sequence;
00212     }
00213   }
00214 
00215   // -- Private methods
00216 
00217   /**
00218    * Establishes a connection to a remote server.
00219    * @access private
00220    */
00221   private function _establishSocketConnection()
00222   {
00223     $host = $this->_params['host'];
00224     if (!empty($this->_params['protocol']))
00225     {
00226       $host = $this->_params['protocol'] . '://' . $host;
00227     }
00228     $timeout = 15;
00229     if (!empty($this->_params['timeout']))
00230     {
00231       $timeout = $this->_params['timeout'];
00232     }
00233     if (!$this->_stream = fsockopen($host, $this->_params['port'], $errno, $errstr, $timeout))
00234     {
00235       throw new Swift_TransportException(
00236         'Connection could not be established with host ' . $this->_params['host'] .
00237         ' [' . $errstr . ' #' . $errno . ']'
00238         );
00239     }
00240     if (!empty($this->_params['blocking']))
00241     {
00242       stream_set_blocking($this->_stream, 1);
00243     }
00244     else
00245     {
00246       stream_set_blocking($this->_stream, 0);
00247     }
00248     $this->_in =& $this->_stream;
00249     $this->_out =& $this->_stream;
00250   }
00251 
00252   /**
00253    * Opens a process for input/output.
00254    * @access private
00255    */
00256   private function _establishProcessConnection()
00257   {
00258     $command = $this->_params['command'];
00259     $descriptorSpec = array(
00260       0 => array('pipe', 'r'),
00261       1 => array('pipe', 'w'),
00262       2 => array('pipe', 'w')
00263       );
00264     $this->_stream = proc_open($command, $descriptorSpec, $pipes);
00265     stream_set_blocking($pipes[2], 0);
00266     if ($err = stream_get_contents($pipes[2]))
00267     {
00268       throw new Swift_TransportException(
00269         'Process could not be started [' . $err . ']'
00270         );
00271     }
00272     $this->_in =& $pipes[0];
00273     $this->_out =& $pipes[1];
00274   }
00275 
00276 }