1use crate::{bif::BifError, Value};
2use std::fs;
3use std::io::{ErrorKind, Read, Write};
4use std::net::{SocketAddr, TcpStream, ToSocketAddrs};
5#[cfg(unix)]
6use std::os::unix::net::UnixStream;
7use std::path::Path;
8
9const FCGI_VERSION_1: u8 = 1;
10const FCGI_BEGIN_REQUEST: u8 = 1;
11const FCGI_END_REQUEST: u8 = 3;
12const FCGI_PARAMS: u8 = 4;
13const FCGI_STDIN: u8 = 5;
14const FCGI_STDOUT: u8 = 6;
15const FCGI_STDERR: u8 = 7;
16const FCGI_RESPONDER: u16 = 1;
17const FCGI_REQUEST_ID: u16 = 1;
18const PHP_FPM_TIMEOUT_SECS: u64 = 5;
19const BRIDGE_ERROR_KEY: &str = "__neutralts_obj_error";
20
21const BRIDGE_SCRIPT: &str = r#"<?php
22header('Content-Type: application/json');
23$raw = file_get_contents('php://input');
24$payload = json_decode($raw, true);
25
26if (!is_array($payload)) {
27 echo json_encode(["__neutralts_obj_error" => "invalid payload"]);
28 exit;
29}
30
31$script_file = $payload["script_file"] ?? "";
32$callback = $payload["callback"] ?? "main";
33$params = $payload["params"] ?? [];
34$GLOBALS["__NEUTRAL_SCHEMA__"] = $payload["schema"] ?? null;
35$GLOBALS["__NEUTRAL_SCHEMA_DATA__"] = $payload["schema_data"] ?? null;
36$GLOBALS["__NEUTRAL_VENV__"] = $payload["venv"] ?? null;
37
38if (!is_string($script_file) || $script_file === "" || !is_file($script_file)) {
39 echo json_encode(["__neutralts_obj_error" => "obj script not found"]);
40 exit;
41}
42
43try {
44 require_once $script_file;
45} catch (Throwable $e) {
46 echo json_encode(["__neutralts_obj_error" => "php script load failed"]);
47 exit;
48}
49
50if (!is_callable($callback)) {
51 echo json_encode(["__neutralts_obj_error" => "callback not found"]);
52 exit;
53}
54
55try {
56 $result = call_user_func($callback, $params);
57} catch (Throwable $e) {
58 echo json_encode(["__neutralts_obj_error" => "callback execution failed"]);
59 exit;
60}
61
62$json = json_encode($result);
63if ($json === false) {
64 echo json_encode(["__neutralts_obj_error" => "invalid callback response"]);
65 exit;
66}
67
68echo $json;
69"#;
70
71enum FpmEndpoint {
72 Tcp(SocketAddr),
73 #[cfg(unix)]
74 Unix(String),
75}
76
77enum FpmStream {
78 Tcp(TcpStream),
79 #[cfg(unix)]
80 Unix(UnixStream),
81}
82
83impl Read for FpmStream {
84 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
85 match self {
86 Self::Tcp(s) => s.read(buf),
87 #[cfg(unix)]
88 Self::Unix(s) => s.read(buf),
89 }
90 }
91}
92
93impl Write for FpmStream {
94 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
95 match self {
96 Self::Tcp(s) => s.write(buf),
97 #[cfg(unix)]
98 Self::Unix(s) => s.write(buf),
99 }
100 }
101
102 fn flush(&mut self) -> std::io::Result<()> {
103 match self {
104 Self::Tcp(s) => s.flush(),
105 #[cfg(unix)]
106 Self::Unix(s) => s.flush(),
107 }
108 }
109}
110
111pub struct PhpExecutor;
112
113impl PhpExecutor {
114 pub(crate) fn exec_php(
115 file: &str,
116 params_value: &Value,
117 callback_name: &str,
118 schema: Option<&Value>,
119 schema_data: Option<&Value>,
120 venv_path: Option<&str>,
121 fpm_endpoint: &str,
122 ) -> Result<Value, BifError> {
123 let bridge_path = Self::bridge_path_for(file)?;
124 Self::ensure_bridge_script(&bridge_path)?;
125
126 let endpoint = Self::parse_endpoint(fpm_endpoint).map_err(|e| BifError {
127 msg: e,
128 name: "php_callback".to_string(),
129 file: file.to_string(),
130 src: file.to_string(),
131 })?;
132 let mut stream = Self::connect_endpoint(&endpoint).map_err(|e| BifError {
133 msg: format!("php-fpm connection failed: {}", e),
134 name: "php_callback".to_string(),
135 file: file.to_string(),
136 src: file.to_string(),
137 })?;
138
139 let payload = serde_json::json!({
140 "script_file": file,
141 "callback": callback_name,
142 "params": params_value,
143 "schema": schema.cloned().unwrap_or(Value::Null),
144 "schema_data": schema_data.cloned().unwrap_or(Value::Null),
145 "venv": venv_path.unwrap_or(""),
146 });
147 let body = serde_json::to_vec(&payload).map_err(|e| BifError {
148 msg: format!("invalid php payload: {}", e),
149 name: "php_callback".to_string(),
150 file: file.to_string(),
151 src: file.to_string(),
152 })?;
153
154 Self::send_fastcgi_request(&mut stream, &body, &bridge_path).map_err(|e| BifError {
155 msg: format!("php-fpm request failed: {}", e),
156 name: "php_callback".to_string(),
157 file: file.to_string(),
158 src: file.to_string(),
159 })?;
160
161 let (stdout, stderr) = Self::read_fastcgi_response(&mut stream).map_err(|e| BifError {
162 msg: format!("php-fpm response failed: {}", e),
163 name: "php_callback".to_string(),
164 file: file.to_string(),
165 src: file.to_string(),
166 })?;
167
168 if !stderr.trim().is_empty() {
169 return Err(BifError {
170 msg: format!("php-fpm stderr: {}", stderr.trim()),
171 name: "php_callback".to_string(),
172 file: file.to_string(),
173 src: file.to_string(),
174 });
175 }
176
177 let body = Self::extract_http_body(&stdout);
178 let value: Value = serde_json::from_str(body.trim()).map_err(|e| BifError {
179 msg: format!("invalid php-fpm response: {}", e),
180 name: "php_callback".to_string(),
181 file: file.to_string(),
182 src: file.to_string(),
183 })?;
184
185 if let Some(msg) = value
186 .get(BRIDGE_ERROR_KEY)
187 .and_then(|v| v.as_str())
188 .map(|v| v.to_string())
189 {
190 return Err(BifError {
191 msg,
192 name: "php_callback".to_string(),
193 file: file.to_string(),
194 src: file.to_string(),
195 });
196 }
197
198 Ok(value)
199 }
200
201 fn bridge_path_for(file: &str) -> Result<String, BifError> {
202 let file_path = Path::new(file);
203 let parent = file_path.parent().unwrap_or_else(|| Path::new("."));
204 let parent_abs = if parent.is_absolute() {
205 parent.to_path_buf()
206 } else {
207 std::env::current_dir()
208 .map_err(|e| BifError {
209 msg: format!("failed to get current_dir: {}", e),
210 name: "php_callback".to_string(),
211 file: file.to_string(),
212 src: file.to_string(),
213 })?
214 .join(parent)
215 };
216 let bridge_path = parent_abs.join(".neutralts_obj_bridge.php");
217 let bridge = bridge_path
218 .to_str()
219 .ok_or_else(|| BifError {
220 msg: "invalid bridge path encoding".to_string(),
221 name: "php_callback".to_string(),
222 file: file.to_string(),
223 src: file.to_string(),
224 })?
225 .to_string();
226 Ok(bridge)
227 }
228
229 fn ensure_bridge_script(bridge_path: &str) -> Result<(), BifError> {
230 if Path::new(bridge_path).exists() {
231 return Ok(());
232 }
233
234 fs::write(bridge_path, BRIDGE_SCRIPT).map_err(|e| BifError {
235 msg: format!("failed to create php bridge: {}", e),
236 name: "php_callback".to_string(),
237 file: "".to_string(),
238 src: "".to_string(),
239 })?;
240 Ok(())
241 }
242
243 fn parse_endpoint(endpoint: &str) -> Result<FpmEndpoint, String> {
244 let endpoint = endpoint.trim();
245 if endpoint.is_empty() {
246 return Err("invalid php-fpm endpoint".to_string());
247 }
248
249 if let Some(path) = endpoint.strip_prefix("unix:") {
250 if path.is_empty() {
251 return Err("invalid php-fpm endpoint".to_string());
252 }
253 #[cfg(unix)]
254 {
255 return Ok(FpmEndpoint::Unix(path.to_string()));
256 }
257 #[cfg(not(unix))]
258 {
259 return Err("unix socket is not supported on this platform".to_string());
260 }
261 }
262
263 let tcp_endpoint = endpoint.strip_prefix("tcp://").unwrap_or(endpoint);
264 if tcp_endpoint.contains(':') {
265 let mut addrs = tcp_endpoint
266 .to_socket_addrs()
267 .map_err(|_| "invalid php-fpm endpoint".to_string())?;
268 if let Some(addr) = addrs.next() {
269 return Ok(FpmEndpoint::Tcp(addr));
270 }
271 }
272
273 Err("invalid php-fpm endpoint".to_string())
274 }
275
276 fn connect_endpoint(endpoint: &FpmEndpoint) -> Result<FpmStream, String> {
277 match endpoint {
278 FpmEndpoint::Tcp(addr) => {
279 let stream = TcpStream::connect_timeout(
280 addr,
281 std::time::Duration::from_secs(PHP_FPM_TIMEOUT_SECS),
282 )
283 .map_err(|e| e.to_string())?;
284 stream
285 .set_read_timeout(Some(std::time::Duration::from_secs(PHP_FPM_TIMEOUT_SECS)))
286 .map_err(|e| e.to_string())?;
287 stream
288 .set_write_timeout(Some(std::time::Duration::from_secs(PHP_FPM_TIMEOUT_SECS)))
289 .map_err(|e| e.to_string())?;
290 Ok(FpmStream::Tcp(stream))
291 }
292 #[cfg(unix)]
293 FpmEndpoint::Unix(path) => {
294 let stream = UnixStream::connect(path).map_err(|e| e.to_string())?;
295 stream
296 .set_read_timeout(Some(std::time::Duration::from_secs(PHP_FPM_TIMEOUT_SECS)))
297 .map_err(|e| e.to_string())?;
298 stream
299 .set_write_timeout(Some(std::time::Duration::from_secs(PHP_FPM_TIMEOUT_SECS)))
300 .map_err(|e| e.to_string())?;
301 Ok(FpmStream::Unix(stream))
302 }
303 }
304 }
305
306 fn send_fastcgi_request(
307 stream: &mut FpmStream,
308 body: &[u8],
309 bridge_path: &str,
310 ) -> Result<(), String> {
311 let begin_body = [
312 (FCGI_RESPONDER >> 8) as u8,
313 (FCGI_RESPONDER & 0xFF) as u8,
314 0,
315 0,
316 0,
317 0,
318 0,
319 0,
320 ];
321 Self::write_record(stream, FCGI_BEGIN_REQUEST, FCGI_REQUEST_ID, &begin_body)?;
322
323 let params = vec![
324 ("SCRIPT_FILENAME", bridge_path.to_string()),
325 ("SCRIPT_NAME", bridge_path.to_string()),
326 ("REQUEST_METHOD", "POST".to_string()),
327 ("CONTENT_TYPE", "application/json".to_string()),
328 ("CONTENT_LENGTH", body.len().to_string()),
329 ("SERVER_PROTOCOL", "HTTP/1.1".to_string()),
330 ("GATEWAY_INTERFACE", "CGI/1.1".to_string()),
331 ("REQUEST_URI", "/".to_string()),
332 ("DOCUMENT_ROOT", "/".to_string()),
333 ("REMOTE_ADDR", "127.0.0.1".to_string()),
334 ("REMOTE_PORT", "0".to_string()),
335 ("SERVER_ADDR", "127.0.0.1".to_string()),
336 ("SERVER_PORT", "80".to_string()),
337 ("SERVER_NAME", "neutralts".to_string()),
338 ];
339
340 let mut params_buf = Vec::new();
341 for (name, value) in params {
342 Self::write_name_value_pair(&mut params_buf, name.as_bytes(), value.as_bytes())?;
343 }
344
345 if !params_buf.is_empty() {
346 Self::write_record(stream, FCGI_PARAMS, FCGI_REQUEST_ID, ¶ms_buf)?;
347 }
348 Self::write_record(stream, FCGI_PARAMS, FCGI_REQUEST_ID, &[])?;
349
350 if !body.is_empty() {
351 Self::write_record(stream, FCGI_STDIN, FCGI_REQUEST_ID, body)?;
352 }
353 Self::write_record(stream, FCGI_STDIN, FCGI_REQUEST_ID, &[])?;
354 stream.flush().map_err(|e| e.to_string())?;
355 Ok(())
356 }
357
358 fn read_fastcgi_response(stream: &mut FpmStream) -> Result<(String, String), String> {
359 let mut stdout = Vec::new();
360 let mut stderr = Vec::new();
361
362 loop {
363 let mut header = [0u8; 8];
364 match stream.read_exact(&mut header) {
365 Ok(_) => {}
366 Err(e) if e.kind() == ErrorKind::UnexpectedEof => {
367 if !stdout.is_empty() || !stderr.is_empty() {
370 break;
371 }
372 return Err(e.to_string());
373 }
374 Err(e) => return Err(e.to_string()),
375 }
376
377 if header[0] != FCGI_VERSION_1 {
380 let prefix = String::from_utf8_lossy(&header);
381 if prefix.starts_with("HTTP/") {
382 return Err(
383 "invalid FastCGI response: endpoint looks like HTTP, not PHP-FPM"
384 .to_string(),
385 );
386 }
387 return Err(format!(
388 "invalid FastCGI response: unsupported version {}",
389 header[0]
390 ));
391 }
392
393 let record_type = header[1];
394 let request_id = u16::from_be_bytes([header[2], header[3]]);
395 let content_length = u16::from_be_bytes([header[4], header[5]]) as usize;
396 let padding_length = header[6] as usize;
397
398 let mut content = vec![0u8; content_length];
399 if content_length > 0 {
400 match stream.read_exact(&mut content) {
401 Ok(_) => {}
402 Err(e) if e.kind() == ErrorKind::UnexpectedEof => {
403 if !stdout.is_empty() || !stderr.is_empty() {
404 break;
405 }
406 return Err(e.to_string());
407 }
408 Err(e) => return Err(e.to_string()),
409 }
410 }
411 if padding_length > 0 {
412 let mut padding = vec![0u8; padding_length];
413 match stream.read_exact(&mut padding) {
414 Ok(_) => {}
415 Err(e) if e.kind() == ErrorKind::UnexpectedEof => {
416 if !stdout.is_empty() || !stderr.is_empty() {
417 break;
418 }
419 return Err(e.to_string());
420 }
421 Err(e) => return Err(e.to_string()),
422 }
423 }
424
425 if request_id != FCGI_REQUEST_ID {
426 continue;
427 }
428
429 match record_type {
430 FCGI_STDOUT => stdout.extend_from_slice(&content),
431 FCGI_STDERR => stderr.extend_from_slice(&content),
432 FCGI_END_REQUEST => break,
433 _ => {}
434 }
435 }
436
437 Ok((
438 String::from_utf8_lossy(&stdout).into_owned(),
439 String::from_utf8_lossy(&stderr).into_owned(),
440 ))
441 }
442
443 fn extract_http_body(stdout: &str) -> &str {
444 if let Some(pos) = stdout.find("\r\n\r\n") {
445 &stdout[pos + 4..]
446 } else if let Some(pos) = stdout.find("\n\n") {
447 &stdout[pos + 2..]
448 } else {
449 stdout
450 }
451 }
452
453 fn write_record(
454 stream: &mut FpmStream,
455 record_type: u8,
456 request_id: u16,
457 content: &[u8],
458 ) -> Result<(), String> {
459 if content.len() > u16::MAX as usize {
460 return Err("fastcgi content too large".to_string());
461 }
462
463 let padding_len = (8 - (content.len() % 8)) % 8;
464 let header = [
465 FCGI_VERSION_1,
466 record_type,
467 (request_id >> 8) as u8,
468 (request_id & 0xFF) as u8,
469 ((content.len() >> 8) & 0xFF) as u8,
470 (content.len() & 0xFF) as u8,
471 padding_len as u8,
472 0,
473 ];
474
475 stream.write_all(&header).map_err(|e| e.to_string())?;
476 if !content.is_empty() {
477 stream.write_all(content).map_err(|e| e.to_string())?;
478 }
479 if padding_len > 0 {
480 let padding = vec![0u8; padding_len];
481 stream.write_all(&padding).map_err(|e| e.to_string())?;
482 }
483 Ok(())
484 }
485
486 fn write_name_value_pair(out: &mut Vec<u8>, name: &[u8], value: &[u8]) -> Result<(), String> {
487 Self::write_length(out, name.len())?;
488 Self::write_length(out, value.len())?;
489 out.extend_from_slice(name);
490 out.extend_from_slice(value);
491 Ok(())
492 }
493
494 fn write_length(out: &mut Vec<u8>, len: usize) -> Result<(), String> {
495 if len < 128 {
496 out.push(len as u8);
497 return Ok(());
498 }
499 if len > 0x7fff_ffff {
500 return Err("fastcgi name/value too large".to_string());
501 }
502 out.push(((len >> 24) as u8) | 0x80);
503 out.push((len >> 16) as u8);
504 out.push((len >> 8) as u8);
505 out.push(len as u8);
506 Ok(())
507 }
508}