neutralts/bif/
exec_php.rs

1use crate::{bif::BifError, Value};
2use std::fs;
3use std::io::{ErrorKind, Read, Write};
4use std::net::{SocketAddr, TcpStream, ToSocketAddrs};
5#[cfg(unix)]
6use std::os::unix::net::UnixStream;
7use std::path::Path;
8
9const FCGI_VERSION_1: u8 = 1;
10const FCGI_BEGIN_REQUEST: u8 = 1;
11const FCGI_END_REQUEST: u8 = 3;
12const FCGI_PARAMS: u8 = 4;
13const FCGI_STDIN: u8 = 5;
14const FCGI_STDOUT: u8 = 6;
15const FCGI_STDERR: u8 = 7;
16const FCGI_RESPONDER: u16 = 1;
17const FCGI_REQUEST_ID: u16 = 1;
18const PHP_FPM_TIMEOUT_SECS: u64 = 5;
19const BRIDGE_ERROR_KEY: &str = "__neutralts_obj_error";
20
21const BRIDGE_SCRIPT: &str = r#"<?php
22header('Content-Type: application/json');
23$raw = file_get_contents('php://input');
24$payload = json_decode($raw, true);
25
26if (!is_array($payload)) {
27    echo json_encode(["__neutralts_obj_error" => "invalid payload"]);
28    exit;
29}
30
31$script_file = $payload["script_file"] ?? "";
32$callback = $payload["callback"] ?? "main";
33$params = $payload["params"] ?? [];
34$GLOBALS["__NEUTRAL_SCHEMA__"] = $payload["schema"] ?? null;
35$GLOBALS["__NEUTRAL_SCHEMA_DATA__"] = $payload["schema_data"] ?? null;
36$GLOBALS["__NEUTRAL_VENV__"] = $payload["venv"] ?? null;
37
38if (!is_string($script_file) || $script_file === "" || !is_file($script_file)) {
39    echo json_encode(["__neutralts_obj_error" => "obj script not found"]);
40    exit;
41}
42
43try {
44    require_once $script_file;
45} catch (Throwable $e) {
46    echo json_encode(["__neutralts_obj_error" => "php script load failed"]);
47    exit;
48}
49
50if (!is_callable($callback)) {
51    echo json_encode(["__neutralts_obj_error" => "callback not found"]);
52    exit;
53}
54
55try {
56    $result = call_user_func($callback, $params);
57} catch (Throwable $e) {
58    echo json_encode(["__neutralts_obj_error" => "callback execution failed"]);
59    exit;
60}
61
62$json = json_encode($result);
63if ($json === false) {
64    echo json_encode(["__neutralts_obj_error" => "invalid callback response"]);
65    exit;
66}
67
68echo $json;
69"#;
70
71enum FpmEndpoint {
72    Tcp(SocketAddr),
73    #[cfg(unix)]
74    Unix(String),
75}
76
77enum FpmStream {
78    Tcp(TcpStream),
79    #[cfg(unix)]
80    Unix(UnixStream),
81}
82
83impl Read for FpmStream {
84    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
85        match self {
86            Self::Tcp(s) => s.read(buf),
87            #[cfg(unix)]
88            Self::Unix(s) => s.read(buf),
89        }
90    }
91}
92
93impl Write for FpmStream {
94    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
95        match self {
96            Self::Tcp(s) => s.write(buf),
97            #[cfg(unix)]
98            Self::Unix(s) => s.write(buf),
99        }
100    }
101
102    fn flush(&mut self) -> std::io::Result<()> {
103        match self {
104            Self::Tcp(s) => s.flush(),
105            #[cfg(unix)]
106            Self::Unix(s) => s.flush(),
107        }
108    }
109}
110
111pub struct PhpExecutor;
112
113impl PhpExecutor {
114    pub(crate) fn exec_php(
115        file: &str,
116        params_value: &Value,
117        callback_name: &str,
118        schema: Option<&Value>,
119        schema_data: Option<&Value>,
120        venv_path: Option<&str>,
121        fpm_endpoint: &str,
122    ) -> Result<Value, BifError> {
123        let bridge_path = Self::bridge_path_for(file)?;
124        Self::ensure_bridge_script(&bridge_path)?;
125
126        let endpoint = Self::parse_endpoint(fpm_endpoint).map_err(|e| BifError {
127            msg: e,
128            name: "php_callback".to_string(),
129            file: file.to_string(),
130            src: file.to_string(),
131        })?;
132        let mut stream = Self::connect_endpoint(&endpoint).map_err(|e| BifError {
133            msg: format!("php-fpm connection failed: {}", e),
134            name: "php_callback".to_string(),
135            file: file.to_string(),
136            src: file.to_string(),
137        })?;
138
139        let payload = serde_json::json!({
140            "script_file": file,
141            "callback": callback_name,
142            "params": params_value,
143            "schema": schema.cloned().unwrap_or(Value::Null),
144            "schema_data": schema_data.cloned().unwrap_or(Value::Null),
145            "venv": venv_path.unwrap_or(""),
146        });
147        let body = serde_json::to_vec(&payload).map_err(|e| BifError {
148            msg: format!("invalid php payload: {}", e),
149            name: "php_callback".to_string(),
150            file: file.to_string(),
151            src: file.to_string(),
152        })?;
153
154        Self::send_fastcgi_request(&mut stream, &body, &bridge_path).map_err(|e| BifError {
155            msg: format!("php-fpm request failed: {}", e),
156            name: "php_callback".to_string(),
157            file: file.to_string(),
158            src: file.to_string(),
159        })?;
160
161        let (stdout, stderr) = Self::read_fastcgi_response(&mut stream).map_err(|e| BifError {
162            msg: format!("php-fpm response failed: {}", e),
163            name: "php_callback".to_string(),
164            file: file.to_string(),
165            src: file.to_string(),
166        })?;
167
168        if !stderr.trim().is_empty() {
169            return Err(BifError {
170                msg: format!("php-fpm stderr: {}", stderr.trim()),
171                name: "php_callback".to_string(),
172                file: file.to_string(),
173                src: file.to_string(),
174            });
175        }
176
177        let body = Self::extract_http_body(&stdout);
178        let value: Value = serde_json::from_str(body.trim()).map_err(|e| BifError {
179            msg: format!("invalid php-fpm response: {}", e),
180            name: "php_callback".to_string(),
181            file: file.to_string(),
182            src: file.to_string(),
183        })?;
184
185        if let Some(msg) = value
186            .get(BRIDGE_ERROR_KEY)
187            .and_then(|v| v.as_str())
188            .map(|v| v.to_string())
189        {
190            return Err(BifError {
191                msg,
192                name: "php_callback".to_string(),
193                file: file.to_string(),
194                src: file.to_string(),
195            });
196        }
197
198        Ok(value)
199    }
200
201    fn bridge_path_for(file: &str) -> Result<String, BifError> {
202        let file_path = Path::new(file);
203        let parent = file_path.parent().unwrap_or_else(|| Path::new("."));
204        let parent_abs = if parent.is_absolute() {
205            parent.to_path_buf()
206        } else {
207            std::env::current_dir()
208                .map_err(|e| BifError {
209                    msg: format!("failed to get current_dir: {}", e),
210                    name: "php_callback".to_string(),
211                    file: file.to_string(),
212                    src: file.to_string(),
213                })?
214                .join(parent)
215        };
216        let bridge_path = parent_abs.join(".neutralts_obj_bridge.php");
217        let bridge = bridge_path
218            .to_str()
219            .ok_or_else(|| BifError {
220                msg: "invalid bridge path encoding".to_string(),
221                name: "php_callback".to_string(),
222                file: file.to_string(),
223                src: file.to_string(),
224            })?
225            .to_string();
226        Ok(bridge)
227    }
228
229    fn ensure_bridge_script(bridge_path: &str) -> Result<(), BifError> {
230        if Path::new(bridge_path).exists() {
231            return Ok(());
232        }
233
234        fs::write(bridge_path, BRIDGE_SCRIPT).map_err(|e| BifError {
235            msg: format!("failed to create php bridge: {}", e),
236            name: "php_callback".to_string(),
237            file: "".to_string(),
238            src: "".to_string(),
239        })?;
240        Ok(())
241    }
242
243    fn parse_endpoint(endpoint: &str) -> Result<FpmEndpoint, String> {
244        let endpoint = endpoint.trim();
245        if endpoint.is_empty() {
246            return Err("invalid php-fpm endpoint".to_string());
247        }
248
249        if let Some(path) = endpoint.strip_prefix("unix:") {
250            if path.is_empty() {
251                return Err("invalid php-fpm endpoint".to_string());
252            }
253            #[cfg(unix)]
254            {
255                return Ok(FpmEndpoint::Unix(path.to_string()));
256            }
257            #[cfg(not(unix))]
258            {
259                return Err("unix socket is not supported on this platform".to_string());
260            }
261        }
262
263        let tcp_endpoint = endpoint.strip_prefix("tcp://").unwrap_or(endpoint);
264        if tcp_endpoint.contains(':') {
265            let mut addrs = tcp_endpoint
266                .to_socket_addrs()
267                .map_err(|_| "invalid php-fpm endpoint".to_string())?;
268            if let Some(addr) = addrs.next() {
269                return Ok(FpmEndpoint::Tcp(addr));
270            }
271        }
272
273        Err("invalid php-fpm endpoint".to_string())
274    }
275
276    fn connect_endpoint(endpoint: &FpmEndpoint) -> Result<FpmStream, String> {
277        match endpoint {
278            FpmEndpoint::Tcp(addr) => {
279                let stream = TcpStream::connect_timeout(
280                    addr,
281                    std::time::Duration::from_secs(PHP_FPM_TIMEOUT_SECS),
282                )
283                .map_err(|e| e.to_string())?;
284                stream
285                    .set_read_timeout(Some(std::time::Duration::from_secs(PHP_FPM_TIMEOUT_SECS)))
286                    .map_err(|e| e.to_string())?;
287                stream
288                    .set_write_timeout(Some(std::time::Duration::from_secs(PHP_FPM_TIMEOUT_SECS)))
289                    .map_err(|e| e.to_string())?;
290                Ok(FpmStream::Tcp(stream))
291            }
292            #[cfg(unix)]
293            FpmEndpoint::Unix(path) => {
294                let stream = UnixStream::connect(path).map_err(|e| e.to_string())?;
295                stream
296                    .set_read_timeout(Some(std::time::Duration::from_secs(PHP_FPM_TIMEOUT_SECS)))
297                    .map_err(|e| e.to_string())?;
298                stream
299                    .set_write_timeout(Some(std::time::Duration::from_secs(PHP_FPM_TIMEOUT_SECS)))
300                    .map_err(|e| e.to_string())?;
301                Ok(FpmStream::Unix(stream))
302            }
303        }
304    }
305
306    fn send_fastcgi_request(
307        stream: &mut FpmStream,
308        body: &[u8],
309        bridge_path: &str,
310    ) -> Result<(), String> {
311        let begin_body = [
312            (FCGI_RESPONDER >> 8) as u8,
313            (FCGI_RESPONDER & 0xFF) as u8,
314            0,
315            0,
316            0,
317            0,
318            0,
319            0,
320        ];
321        Self::write_record(stream, FCGI_BEGIN_REQUEST, FCGI_REQUEST_ID, &begin_body)?;
322
323        let params = vec![
324            ("SCRIPT_FILENAME", bridge_path.to_string()),
325            ("SCRIPT_NAME", bridge_path.to_string()),
326            ("REQUEST_METHOD", "POST".to_string()),
327            ("CONTENT_TYPE", "application/json".to_string()),
328            ("CONTENT_LENGTH", body.len().to_string()),
329            ("SERVER_PROTOCOL", "HTTP/1.1".to_string()),
330            ("GATEWAY_INTERFACE", "CGI/1.1".to_string()),
331            ("REQUEST_URI", "/".to_string()),
332            ("DOCUMENT_ROOT", "/".to_string()),
333            ("REMOTE_ADDR", "127.0.0.1".to_string()),
334            ("REMOTE_PORT", "0".to_string()),
335            ("SERVER_ADDR", "127.0.0.1".to_string()),
336            ("SERVER_PORT", "80".to_string()),
337            ("SERVER_NAME", "neutralts".to_string()),
338        ];
339
340        let mut params_buf = Vec::new();
341        for (name, value) in params {
342            Self::write_name_value_pair(&mut params_buf, name.as_bytes(), value.as_bytes())?;
343        }
344
345        if !params_buf.is_empty() {
346            Self::write_record(stream, FCGI_PARAMS, FCGI_REQUEST_ID, &params_buf)?;
347        }
348        Self::write_record(stream, FCGI_PARAMS, FCGI_REQUEST_ID, &[])?;
349
350        if !body.is_empty() {
351            Self::write_record(stream, FCGI_STDIN, FCGI_REQUEST_ID, body)?;
352        }
353        Self::write_record(stream, FCGI_STDIN, FCGI_REQUEST_ID, &[])?;
354        stream.flush().map_err(|e| e.to_string())?;
355        Ok(())
356    }
357
358    fn read_fastcgi_response(stream: &mut FpmStream) -> Result<(String, String), String> {
359        let mut stdout = Vec::new();
360        let mut stderr = Vec::new();
361
362        loop {
363            let mut header = [0u8; 8];
364            match stream.read_exact(&mut header) {
365                Ok(_) => {}
366                Err(e) if e.kind() == ErrorKind::UnexpectedEof => {
367                    // Some FPM setups close the socket immediately after STDOUT/STDERR
368                    // without sending END_REQUEST. If we already have payload, treat as done.
369                    if !stdout.is_empty() || !stderr.is_empty() {
370                        break;
371                    }
372                    return Err(e.to_string());
373                }
374                Err(e) => return Err(e.to_string()),
375            }
376
377            // FastCGI response must start with version 1.
378            // If not, this is likely not a FastCGI endpoint (for example, HTTP server on :9000).
379            if header[0] != FCGI_VERSION_1 {
380                let prefix = String::from_utf8_lossy(&header);
381                if prefix.starts_with("HTTP/") {
382                    return Err(
383                        "invalid FastCGI response: endpoint looks like HTTP, not PHP-FPM"
384                            .to_string(),
385                    );
386                }
387                return Err(format!(
388                    "invalid FastCGI response: unsupported version {}",
389                    header[0]
390                ));
391            }
392
393            let record_type = header[1];
394            let request_id = u16::from_be_bytes([header[2], header[3]]);
395            let content_length = u16::from_be_bytes([header[4], header[5]]) as usize;
396            let padding_length = header[6] as usize;
397
398            let mut content = vec![0u8; content_length];
399            if content_length > 0 {
400                match stream.read_exact(&mut content) {
401                    Ok(_) => {}
402                    Err(e) if e.kind() == ErrorKind::UnexpectedEof => {
403                        if !stdout.is_empty() || !stderr.is_empty() {
404                            break;
405                        }
406                        return Err(e.to_string());
407                    }
408                    Err(e) => return Err(e.to_string()),
409                }
410            }
411            if padding_length > 0 {
412                let mut padding = vec![0u8; padding_length];
413                match stream.read_exact(&mut padding) {
414                    Ok(_) => {}
415                    Err(e) if e.kind() == ErrorKind::UnexpectedEof => {
416                        if !stdout.is_empty() || !stderr.is_empty() {
417                            break;
418                        }
419                        return Err(e.to_string());
420                    }
421                    Err(e) => return Err(e.to_string()),
422                }
423            }
424
425            if request_id != FCGI_REQUEST_ID {
426                continue;
427            }
428
429            match record_type {
430                FCGI_STDOUT => stdout.extend_from_slice(&content),
431                FCGI_STDERR => stderr.extend_from_slice(&content),
432                FCGI_END_REQUEST => break,
433                _ => {}
434            }
435        }
436
437        Ok((
438            String::from_utf8_lossy(&stdout).into_owned(),
439            String::from_utf8_lossy(&stderr).into_owned(),
440        ))
441    }
442
443    fn extract_http_body(stdout: &str) -> &str {
444        if let Some(pos) = stdout.find("\r\n\r\n") {
445            &stdout[pos + 4..]
446        } else if let Some(pos) = stdout.find("\n\n") {
447            &stdout[pos + 2..]
448        } else {
449            stdout
450        }
451    }
452
453    fn write_record(
454        stream: &mut FpmStream,
455        record_type: u8,
456        request_id: u16,
457        content: &[u8],
458    ) -> Result<(), String> {
459        if content.len() > u16::MAX as usize {
460            return Err("fastcgi content too large".to_string());
461        }
462
463        let padding_len = (8 - (content.len() % 8)) % 8;
464        let header = [
465            FCGI_VERSION_1,
466            record_type,
467            (request_id >> 8) as u8,
468            (request_id & 0xFF) as u8,
469            ((content.len() >> 8) & 0xFF) as u8,
470            (content.len() & 0xFF) as u8,
471            padding_len as u8,
472            0,
473        ];
474
475        stream.write_all(&header).map_err(|e| e.to_string())?;
476        if !content.is_empty() {
477            stream.write_all(content).map_err(|e| e.to_string())?;
478        }
479        if padding_len > 0 {
480            let padding = vec![0u8; padding_len];
481            stream.write_all(&padding).map_err(|e| e.to_string())?;
482        }
483        Ok(())
484    }
485
486    fn write_name_value_pair(out: &mut Vec<u8>, name: &[u8], value: &[u8]) -> Result<(), String> {
487        Self::write_length(out, name.len())?;
488        Self::write_length(out, value.len())?;
489        out.extend_from_slice(name);
490        out.extend_from_slice(value);
491        Ok(())
492    }
493
494    fn write_length(out: &mut Vec<u8>, len: usize) -> Result<(), String> {
495        if len < 128 {
496            out.push(len as u8);
497            return Ok(());
498        }
499        if len > 0x7fff_ffff {
500            return Err("fastcgi name/value too large".to_string());
501        }
502        out.push(((len >> 24) as u8) | 0x80);
503        out.push((len >> 16) as u8);
504        out.push((len >> 8) as u8);
505        out.push(len as u8);
506        Ok(())
507    }
508}