|
TYPO3 API
SVNRelease
|
00001 <?php 00002 00003 /* 00004 * This file is part of SwiftMailer. 00005 * (c) 2004-2009 Chris Corbyn 00006 * 00007 * For the full copyright and license information, please view the LICENSE 00008 * file that was distributed with this source code. 00009 */ 00010 00011 //@require 'Swift/ByteStream/AbstractFilterableInputStream.php'; 00012 //@require 'Swift/ReplacementFilterFactory.php'; 00013 //@require 'Swift/Transport/IoBuffer.php'; 00014 //@require 'Swift/TransportException.php'; 00015 00016 /** 00017 * A generic IoBuffer implementation supporting remote sockets and local processes. 00018 * @package Swift 00019 * @subpackage Transport 00020 * @author Chris Corbyn 00021 */ 00022 class Swift_Transport_StreamBuffer 00023 extends Swift_ByteStream_AbstractFilterableInputStream 00024 implements Swift_Transport_IoBuffer 00025 { 00026 00027 /** A primary socket */ 00028 private $_stream; 00029 00030 /** The input stream */ 00031 private $_in; 00032 00033 /** The output stream */ 00034 private $_out; 00035 00036 /** Buffer initialization parameters */ 00037 private $_params = array(); 00038 00039 /** The ReplacementFilterFactory */ 00040 private $_replacementFactory; 00041 00042 /** Translations performed on data being streamed into the buffer */ 00043 private $_translations = array(); 00044 00045 /** 00046 * Create a new StreamBuffer using $replacementFactory for transformations. 00047 * @param Swift_ReplacementFilterFactory $replacementFactory 00048 */ 00049 public function __construct( 00050 Swift_ReplacementFilterFactory $replacementFactory) 00051 { 00052 $this->_replacementFactory = $replacementFactory; 00053 } 00054 00055 /** 00056 * Perform any initialization needed, using the given $params. 00057 * Parameters will vary depending upon the type of IoBuffer used. 00058 * @param array $params 00059 */ 00060 public function initialize(array $params) 00061 { 00062 $this->_params = $params; 00063 switch ($params['type']) 00064 { 00065 case self::TYPE_PROCESS: 00066 $this->_establishProcessConnection(); 00067 break; 00068 case self::TYPE_SOCKET: 00069 default: 00070 $this->_establishSocketConnection(); 00071 break; 00072 } 00073 } 00074 00075 /** 00076 * Set an individual param on the buffer (e.g. switching to SSL). 00077 * @param string $param 00078 * @param mixed $value 00079 */ 00080 public function setParam($param, $value) 00081 { 00082 if (isset($this->_stream)) 00083 { 00084 switch ($param) 00085 { 00086 case 'protocol': 00087 if (!array_key_exists('protocol', $this->_params) 00088 || $value != $this->_params['protocol']) 00089 { 00090 if ('tls' == $value) 00091 { 00092 stream_socket_enable_crypto( 00093 $this->_stream, true, STREAM_CRYPTO_METHOD_TLS_CLIENT 00094 ); 00095 } 00096 } 00097 break; 00098 } 00099 } 00100 $this->_params[$param] = $value; 00101 } 00102 00103 /** 00104 * Perform any shutdown logic needed. 00105 */ 00106 public function terminate() 00107 { 00108 if (isset($this->_stream)) 00109 { 00110 switch ($this->_params['type']) 00111 { 00112 case self::TYPE_PROCESS: 00113 fclose($this->_in); 00114 fclose($this->_out); 00115 proc_close($this->_stream); 00116 break; 00117 case self::TYPE_SOCKET: 00118 default: 00119 fclose($this->_stream); 00120 break; 00121 } 00122 } 00123 $this->_stream = null; 00124 $this->_out = null; 00125 $this->_in = null; 00126 } 00127 00128 /** 00129 * Set an array of string replacements which should be made on data written 00130 * to the buffer. This could replace LF with CRLF for example. 00131 * @param string[] $replacements 00132 */ 00133 public function setWriteTranslations(array $replacements) 00134 { 00135 foreach ($this->_translations as $search => $replace) 00136 { 00137 if (!isset($replacements[$search])) 00138 { 00139 $this->removeFilter($search); 00140 unset($this->_translations[$search]); 00141 } 00142 } 00143 00144 foreach ($replacements as $search => $replace) 00145 { 00146 if (!isset($this->_translations[$search])) 00147 { 00148 $this->addFilter( 00149 $this->_replacementFactory->createFilter($search, $replace), $search 00150 ); 00151 $this->_translations[$search] = true; 00152 } 00153 } 00154 } 00155 00156 /** 00157 * Get a line of output (including any CRLF). 00158 * The $sequence number comes from any writes and may or may not be used 00159 * depending upon the implementation. 00160 * @param int $sequence of last write to scan from 00161 * @return string 00162 */ 00163 public function readLine($sequence) 00164 { 00165 if (isset($this->_out) && !feof($this->_out)) 00166 { 00167 $line = fgets($this->_out); 00168 return $line; 00169 } 00170 } 00171 00172 /** 00173 * Reads $length bytes from the stream into a string and moves the pointer 00174 * through the stream by $length. If less bytes exist than are requested the 00175 * remaining bytes are given instead. If no bytes are remaining at all, boolean 00176 * false is returned. 00177 * @param int $length 00178 * @return string 00179 */ 00180 public function read($length) 00181 { 00182 if (isset($this->_out) && !feof($this->_out)) 00183 { 00184 $ret = fread($this->_out, $length); 00185 return $ret; 00186 } 00187 } 00188 00189 /** Not implemented */ 00190 public function setReadPointer($byteOffset) 00191 { 00192 } 00193 00194 // -- Protected methods 00195 00196 /** Flush the stream contents */ 00197 protected function _flush() 00198 { 00199 if (isset($this->_in)) 00200 { 00201 fflush($this->_in); 00202 } 00203 } 00204 00205 /** Write this bytes to the stream */ 00206 protected function _commit($bytes) 00207 { 00208 if (isset($this->_in) 00209 && fwrite($this->_in, $bytes)) 00210 { 00211 return ++$this->_sequence; 00212 } 00213 } 00214 00215 // -- Private methods 00216 00217 /** 00218 * Establishes a connection to a remote server. 00219 * @access private 00220 */ 00221 private function _establishSocketConnection() 00222 { 00223 $host = $this->_params['host']; 00224 if (!empty($this->_params['protocol'])) 00225 { 00226 $host = $this->_params['protocol'] . '://' . $host; 00227 } 00228 $timeout = 15; 00229 if (!empty($this->_params['timeout'])) 00230 { 00231 $timeout = $this->_params['timeout']; 00232 } 00233 if (!$this->_stream = fsockopen($host, $this->_params['port'], $errno, $errstr, $timeout)) 00234 { 00235 throw new Swift_TransportException( 00236 'Connection could not be established with host ' . $this->_params['host'] . 00237 ' [' . $errstr . ' #' . $errno . ']' 00238 ); 00239 } 00240 if (!empty($this->_params['blocking'])) 00241 { 00242 stream_set_blocking($this->_stream, 1); 00243 } 00244 else 00245 { 00246 stream_set_blocking($this->_stream, 0); 00247 } 00248 $this->_in =& $this->_stream; 00249 $this->_out =& $this->_stream; 00250 } 00251 00252 /** 00253 * Opens a process for input/output. 00254 * @access private 00255 */ 00256 private function _establishProcessConnection() 00257 { 00258 $command = $this->_params['command']; 00259 $descriptorSpec = array( 00260 0 => array('pipe', 'r'), 00261 1 => array('pipe', 'w'), 00262 2 => array('pipe', 'w') 00263 ); 00264 $this->_stream = proc_open($command, $descriptorSpec, $pipes); 00265 stream_set_blocking($pipes[2], 0); 00266 if ($err = stream_get_contents($pipes[2])) 00267 { 00268 throw new Swift_TransportException( 00269 'Process could not be started [' . $err . ']' 00270 ); 00271 } 00272 $this->_in =& $pipes[0]; 00273 $this->_out =& $pipes[1]; 00274 } 00275 00276 }
1.8.0