https://doc.rust-lang.org/book/ch20-00-final-project-a-web-server.html
Docs에 있는 예제를 따라 정리한 글임을 미리 밝힙니다.
- 요청했을 때 아래와 같은 웹 화면을 반환하는 웹 서버를 만들어보자!
<img height=“200px” src=https://github.com/rlaisqls/rlaisqls/assets/81006587/d82a5ba0-f623-4f4e-bed0-2e618ac8b055/>
- Rust 프로젝트는 아래와 같이 생성할 수 있다.
Terminal window $ cargo new hello --binCreated binary (application) `hello` project$ cd hello
TCP 연결 처리
- 우선
std::net
를 사용해서 하나의 TCP 연결을 처리하는 코드를 짜볼 것이다. TcpListener
를 사용하여127.0.0.1:7878
주소로 TCP연결을 수신할 수 있다.- 위 코드에서
bind
함수는new
함수처럼 동작하며TcpListner
의 새 인스턴스를 반환한다.- bind 함수는 바인딩의 성공여부를 나타내는
Result<T, E>
를 반환한다.
- bind 함수는 바인딩의 성공여부를 나타내는
TcpListener
의 incoming 메소드는 스트림의 차례에 대한 iterator를 반환한다. 각각의 stream 은 클라이언트와 서버간의 열려있는 커넥션을 의미한다.
use std::net::TcpListener;
fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
for stream in listener.incoming() { let stream = stream.unwrap();
println!("Connection established!"); }}
요청 데이터 읽기
- 브라우저로부터의 요청을 읽는 기능을 구현하기 위해
handle_connection
이라는 함수를 새로 만들어보자.
use std::io::prelude::*;use std::net::TcpStream;use std::net::TcpListener;
fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
for stream in listener.incoming() { let stream = stream.unwrap();
handle_connection(stream); }}
fn handle_connection(mut stream: TcpStream) { let mut buffer = [0; 512];
stream.read(&mut buffer).unwrap();
println!("Request: {}", String::from_utf8_lossy(&buffer[..]));}
-
std::io::prelude
를 가져와 스트림으로부터 읽고 쓰는것을 허용하는 특성에 접근할 수 있도록 한다. -
handle_connection
함수에선, stream 매개변수를 가변으로 만들어 줬다. 이유는 TcpStream 인스턴스가 내부에서 어떤 데이터가 우리에게 반환되는지 추적하기 때문이다. 요청하는것에 따라 더 많은 데이터를 읽거나, 다음 요청때까지 데이터를 저장하는 등 내부의 상태가 변경될 수 있기에mut
이 되어야 한다. -
이제 실제로 스트림으로부터 데이터를 읽어보자.
-
데이터를 읽기 위해선 buffer(버퍼)를 읽을 데이터를 저장할 스택에 선언해야 한다. 그리고 버퍼를
stream.read
로 전달하여TcpStream
으로부터 읽어들인 바이트를 버퍼로 집어넣는다. -
이후 버퍼 안에있는 바이트들을 문자열로 변환하고 출력한다.
String::from_utf8_lossy
함수는&[u8]
을 전달받고 String 으로 바꿔서 제공해준다. -
프로그램을 실행하고 TCP로 접속하면 아래와 같이 출력될 것이다.
$ cargo run Compiling hello v0.1.0 (file:///projects/hello) Finished dev [unoptimized + debuginfo] target(s) in 0.42 secs Running `target/debug/hello`Request: GET / HTTP/1.1Host: 127.0.0.1:7878User-Agent: Mozilla/5.0 (Windows NT 10.0; WOW64; rv:52.0) Gecko/20100101Firefox/52.0Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8Accept-Language: en-US,en;q=0.5Accept-Encoding: gzip, deflateConnection: keep-aliveUpgrade-Insecure-Requests: 1������������������������������������
HTTP 응답
-
handle_connection
함수의println!
을 지우고 원하는 HTTP 형식의 메시지를 반환해보자. -
우선 간단한 html을 작성한다.
<!DOCTYPE html><html lang="en"><head><meta charset="utf-8"><title>Hello!</title></head><body><h1>Hello!</h1><p>Hi from Rust</p></body></html> -
HTML파일을 읽고, 응답의 body에 추가, 전송하도록 코드를 수정한다.
use std::fs::File;// --생략--fn handle_connection(mut stream: TcpStream) {let mut buffer = [0; 512];stream.read(&mut buffer).unwrap();let mut file = File::open("hello.html").unwrap();let mut contents = String::new();file.read_to_string(&mut contents).unwrap();let response = format!("HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",contents.len(),contents);stream.write(response.as_bytes()).unwrap();stream.flush().unwrap();} -
HTTP Method
가GET
인 경우에만 Hello 페이지를 반환하고, 그렇지 않은 경우 404를 반환하도록 하려면 html을 추가해주고 if문으로 분기처리해주면 된다.// --생략--fn handle_connection(mut stream: TcpStream) {// --생략--let (status_line, filename) = if buffer.starts_with(get) {("HTTP/1.1 200 OK", "hello.html")} else {("HTTP/1.1 404 NOT FOUND", "404.html")};let mut file = File::open(filename).unwrap();let mut contents = String::new();file.read_to_string(&mut contents).unwrap();let response = format!("{}\r\nContent-Length: {}\r\n\r\n{}",status_line,contents.len(),contents);stream.write(response.as_bytes()).unwrap();stream.flush().unwrap();}
멀티스레드로 바꾸기
- 스레드풀은 대기중이거나 작업을 처리할 준비가 되어 있는 스레드들의 그룹이다.
- 요청이 들어온다면, 요청들은 처리를 위해 풀로 보내지고 풀에선 들어오는 요청들에 대한 큐(queue)를 유지하도록 할 것이다.
- 풀 내의 각 스레드들은 이 큐에서 요청을 꺼내서 처리하고 또 다른 요청이 있는지 큐에 물어보며 여러 요청을 동시에 처리한다.
- 구조는 다음과 같다.
- ThreadPool 은 채널을 생성하고 채널의 송신단을 유지한다.
- 각 Worker 는 채널의 수신단을 유지한다.
- 우린 채널로 전송하려는 클로저를 저장할 새로운 Job 구조체를 생성한다.
- execute 메소드는 채널의 송신단으로 실행하려는 작업을 전송한다.
- 스레드에선 Worker 가 채널의 수신단에서 반복되며 수신되는 모든 작업의 클로저를 실행한다.
use std::thread;type Job = Box<FnOnce() + Send + 'static>;
pub struct ThreadPool { workers: Vec<Worker>,}
impl ThreadPool { // --생략--
pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static { let job = Box::new(f); self.sender.send(job).unwrap(); }
pub fn new(size: usize) -> ThreadPool { assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size);
for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); }
ThreadPool { workers, sender, } } // --생략--}
struct Worker { id: usize, thread: thread::JoinHandle<()>,}
impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || { while let Ok(job) = receiver.lock().unwrap().recv() { println!("Worker {} got a job; executing.", id); job.call_box(); } });
Worker { id, thread, } }}
Graceful한 종료
-
Graceful한 종료란, 서버 종료 신호를 받았을 때 현재 처리하고 있는 요청을 모두 처리할 때 까지 기다린 후 종료하는 것을 의미한다.
-
풀 안의 각 스레드 상에서
join
을 호출하여 스레드가 종료되기 전에 그들이 처리하던 요청을 마저 처리할 수 있도록 하기 위하여 Drop 트레잇을 구현해보자. 아래와 같이Drop
에서worker
의 Thread들에join
하여 종료하길 기다리도록 하는 것이 목표이다....impl Drop for ThreadPool {fn drop(&mut self) {for worker in &mut self.workers {println!("Shutting down worker {}", worker.id);thread.join().unwrap();}}}... -
하지만 이 코드는 아직 작동하지 않는다. worker를 가변 형태로 빌려왔으니, 인수의 소유권을 필요로 하는
join
을 호출할 수 없다. 이 이슈를 해결하기 위해,join
이 스레드를 사용할 수 있도록 thread의 소유권을Worker
인스턴스로부터 빼내야 한다. -
Worker
가Option<thread::>JoinHandle<()>
를 대신 갖도록 하면,Option
의take
메소드를 사용하여Some
variant에서 값을 빼내고None
으로 대체할 수 있다. 즉, 실행중인Worker
는 thread에Some
variant 를 갖게 되고, 우린worker
를 종료하고자 할때Some
을None
으로 대체하여worker
가 실행할 스레드를 없앨 수 있다....struct Worker {id: usize,thread: Option<thread::JoinHandle<()>>,}impl Worker {fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {// --생략--Worker {id,thread: Some(thread),}}} -
worker
로 부터 thread를 빼내기 위해선Option
에서 take를 호출해야하므로 이에 맞춰drop
함수의 내용을 조금 수정해준다.Some
을 파괴하고 스레드를 얻기 위해if let
를 사용한다.impl Drop for ThreadPool {fn drop(&mut self) {for worker in &mut self.workers {println!("Shutting down worker {}", worker.id);if let Some(thread) = worker.thread.take() {thread.join().unwrap();}}}} -
Job
이나 리스닝을 멈추고 무한 반복문을 탈출하라는 신호를 기다리도록 스레드를 수정한다. 두variant
를 가진enum
을 만들어 Job 대신 해당enum
을 가지도록 구현해보자....enum Message {NewJob(Job),Terminate,}...pub struct ThreadPool {workers: Vec<Worker>,sender: mpsc::Sender<Message>,}// --생략--impl ThreadPool {// --생략--pub fn execute<F>(&self, f: F)whereF: FnOnce() + Send + 'static{let job = Box::new(f);self.sender.send(Message::NewJob(job)).unwrap();}}// --생략--impl Worker {fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) ->Worker {let thread = thread::spawn(move ||{loop {let message = receiver.lock().unwrap().recv().unwrap();match message {Message::NewJob(job) => {println!("Worker {} got a job; executing.", id);job.call_box();},Message::Terminate => {println!("Worker {} was told to terminate.", id);break;},}}});Worker {id,thread: Some(thread),}}}...// 각 worker 스레드에 join 을 호출하기 전에 Message::Terminate를 전달한다.impl Drop for ThreadPool {fn drop(&mut self) {println!("Sending terminate message to all workers.");for _ in &mut self.workers {self.sender.send(Message::Terminate).unwrap();}println!("Shutting down all workers.");for worker in &mut self.workers {println!("Shutting down worker {}", worker.id);if let Some(thread) = worker.thread.take() {thread.join().unwrap();}}}}