Tìm hiểu cơ bản về Redis Streams

I. Tổng Quan

Redis Streams là một cấu trúc dữ liệu trong Redis và được giới thiệu từ phiên bản 5.0. Nó được ra đời do nhu cầu ngày càng cao về xử lý dữ liệu theo thời gian thực, quản lý trạng thái xử lý và phân phối công việc… Điều mà các cấu trúc dữ liệu khác của Redis không thể hoàn toàn đáp ứng như: Redis Lists hiện có thể dùng để xử lý hàng đợi hoặc Redis Pub/Sub hiện có thể dùng xử lý Message Broker nhưng khi có yêu cầu lưu trữ message hoặc quản lý trạng thái hay phân phối công việc thì 2 kiểu dữ liệu này sẽ không đáp ứng được và cần phường pháp bổ sung hoặc công cụ nào đó hỗ trợ.

II. Khái niệm

1. Các khái niệm chính

Streams: Là mỗi chuỗi các entries, mỗi entry trong stream có 1 ID duy nhất và chứa dữ liệu

Entry: Là một đơn vị trong stream nó bao gồm 1 ID và một hoặc nhiều cặp key – value.

ID: Được định dạng milliseconds-sequence trong đó milliseconds là thời gian entry được thêm vào stream và sequence là 1 số tăng dần để có thể phân biết được các entry đưuọc thêm vào stream cùng 1 thời điểm. Từ ID này có thể xác định được thứ tự các entry trong stream.

Consumer Group: Ta có thể hiểu là 1 nhóm consumer(Người tiêu dùng) đọc dữ liệu từ 1 stream. Consumer groups giúp phân phối công việc giữa nhiều worker mà không bị trùng lặp

XREAD: Dùng để đọc dữ liệu các Stream và có thể chỉ định số lượng entry cần đọc

XREADGROUP: Dùng để đọc dữ liệu từ stream được sử dụng đối với consumer groups. Cho phép các consumer chia sẻ và xử lý công việc không bị trùng lặp.

XADD: Dùng để thêm entry vào stream

XDEL: Dùng đẻ xóa entry khỏi stream

Pending Entries List: Dùng để theo dõi và quản lý các entry chưa được xử lý trong consumer group, khi các consumer sử dụng lệnh XREADGROUP các entry ở trạng thái pending này sẽ không còn được phân phối cho các consumer khác nữa để đảm báo 1 entry chỉ có duy nhất 1 consumer xử lý.

Acknowledgement: Dùng để xác nhận 1 consumer đã xử lý 1 entry

2. Ưu điểm & Hạn chế

Ưu điểm:

+ Hiệu suất cao: Cái này thì không phải bàn nữa khi nói về Redis, Redis stream có khả năng xử lý hàng triệu bản ghi mỗi giây nhờ cấu trúc dữ liệu trong bộ nhớ

+ Khả năng phân phối và quản lý trạng thái: Redis stream hỗ trợ consumer group cho phép phân phối các entry cho các consumer trong nhóm giúp ta có khả năng mở rộng và các entry trong stream có thể xử lý đồng thời. Redis cung cấp các lệnh XREADGROUP, XACK để quản lý trạng thái các entry cho phép bạn theo dõi các entry chưa xử lý, đã xử lý và đảm báo các entry chỉ xử lý 1 lần duy nhất.

Hạn chế:

+ Do cần quản lý trạng thái và lưu trữ dữ liệu trong bộ nhớ điều này có thể là hạn chế về vấn đề lưu trữ
+ Khả năng khôi phục: Redis Stream không cung cấp khả năng khôi phục. Hiện nay có rất nhiều cách để khôi phục dữ liệu trong Redis như RDB(Redis Database Backup), AOF(Append-Only File), replication, và Sentinel. Nhưng các cách này đều có xác suất mất dữ liệu sau khi khôi phục, các bạn có thể lên google tìm hiểu thêm.

III. Các lệnh cơ bản

1. XADD (Thêm một mục entry vào stream)

XADD {name_stream} {maxlength?} {ID Entry} {field_entry} {value_entry} ... {field_entry_n} {value_entry_n}
Example:
XADD test_stream MAXLEN ~ 1000 * "name" "QuyenNV" age 18

name_stream: Tên của stream

maxlength: Là tùy chọn có thể có hoặc không, dùng để giới hạn số lượng entry nhất định trong stream và các entry này là mới nhất.

field_entry, value_entry…: Là các cặp key value của entry

2. XREAD (Đọc dữ liệu từ một hoặc nhiều stream)

XREAD {COUNT count?} {BLOCK milliseconds?} {stream} ... {stream_n} {ID} ... {ID of stream n}
Example:
XREAD STREAMS stream 0 //All entry
XREAD COUNT 10 STREAMS stream 0 // Tối đa 10 entry
XREAD BLOCK 5000 STREAMS stream 0 // BLOCKING 5s
XREAD COUNT 10 BLOCK 5000 STREAMS stream 0 //Tối đa 10 entry & BLOCKING 5s
XREAD COUNT 10 BLOCK 5000 STREAMS stream stream_1 0 0 //Tương tự trên nhưng get Multi Stream

COUNT count: Số lượng tối đa entry sẽ đọc trong từng stream

BLOCK milliseconds: Thời gian chờ (ms) nhận entry mới, option này dùng để tạo một lệnh blocking

stream … stream_n: Là tên các stream cần đọc

ID … ID of stream n: Là ID bắt đầu đọc của từng stream. Lưu ý số lượng stream sẽ phải bằng với số lượng ID hoặc ngược lại ($ là ID đặc biệt, Nó sẽ chỉ nhận các entry mới thêm vào stream)

3. XGROUP (Quản lý consumer groups)

XGROUP CREATE {name_stream} {name_group} $ MKSTREAM
Example:
XGROUP CREATE stream group 0 MKSTREAM //Xử lý các entry hiện có
XGROUP CREATE stream group $ MKSTREAM //Chỉ xử lý các entry mới không bao gồm entry cũ
XGROUP CREATE stream group 1725251379707-0 MKSTREAM //Chỉ xử lý các entry từ ID 1725251379707-0

name_stream: Tên của stream

name_group: Tên group

$: Chỉ xử lý các entry mới nhất thêm vào stream không xử lý các entry cũ, nếu bạn muốn xử lý các entry hiện có bạn có thể dùng 0 hoặc bạn muốn xử lý các entry từ ID bạn chỉ định.

MKSTREAM: Tạo Stream mới nếu Stream chưa tồn tại

4. XREADGROUP (Đọc dữ liệu từ một stream qua một consumer group)

XREADGROUP {GROUP group consumer} {COUNT count?} {BLOCK milliseconds?} STREAMS {stream} {ID} 
Example: Đọc All entry trong stream(name_stream) chưa xử lý thuộc consumer(name_consumer)
XREADGROUP GROUP name_group name_consumer STREAMS name_stream 0
XREADGROUP GROUP name_group name_consumer COUNT 2 STREAMS name_stream 0 //Max 2 entry 
XREADGROUP GROUP name_group name_consumer BLOCK 5000 STREAMS name_stream 0 //BLOCKING 5s
XREADGROUP GROUP name_group name_consumer COUNT 2 BLOCK 5000 STREAMS name_stream 0 //Max 2 entry, BLOCK 5s

GROUP group consumer: Tên consumer group và tên consumer đọc.

COUNT count: Số lượng tối đa entry sẽ đọc trong stream

BLOCK milliseconds: Thời gian chờ (ms) nhận entry mới, option này dùng để tạo một lệnh blocking

stream: Tên stream

ID: Là vị trí ID bắt đầu đọc của stream (> là ID đặc biết, consumer chỉ nhận entry chưa từng gửi đến consumer nào)

5. XPENDING (Xem thông tin về các mục đang chờ xử lý)

XPENDING {group} {stream} {start} {end} {count} {consumer?} 
Example:
XPENDING name_group name_Stream - + 1000 //All entry pending
XPENDING name_group name_Stream - + 1000 name_consumer //All entry pending trong consumer

group: Tên group

stream: Tên stream

start: ID bắt đầu trong stream (- là bắt đầu từ ID thấp nhất)

end: ID kết thúc trong stream (+ là kết thúc tại ID cao nhất)

count: Số lượng tối đa entry lấy ra

consumer: Tên consumer

6. XACK (Xác nhận đã xử lý một hoặc nhiều mục trong stream)

XACK {stream} {group} {ID} ... {ID n}
Example:
XACK name_stream name_group 1725258852213-0

stream: Tên stream

group: Tên group

ID … ID n: Một hoặc nhiều ID entry xác nhận đã xử lý

7. XDEL (Xóa một hoặc nhiều mục khỏi stream)

XDEL {stream} {ID} ... {ID n}
Example:
XDEL name_Stream 1725258852213-0

stream: Tên stream

ID … ID n: Một hoặc nhiều ID entry muốn xóa

IV. Ví dụ Redis stream hoạt động

Trên hình các bạn có thể thấy các thành phần chính là: Producer, Redis streams, Consumer group, Consumer

1. Producers (Nhà sản xuất)

Producer thêm entry vào stream bằng lệnh XADD

2. Redis streams

Là các stream trong các stream sẽ lưu trữ các entry mà producer thêm vào và được sắp xếp có thứ tự

3. Consumer group

Classifier 1, Classifier 2… là các consumer trong consumer group. Consumer trong consumer group sẽ sử dụng lệnh XREADGROUP để đọc entry từ stream và xử lý chúng. Sau khi xử lý thành công để thông báo cho Redis rằng message này đã được xử lý và có thể xóa nó khỏi danh sách pending của consumer group ta sẽ gửi xác nhận bằng lệnh XACK.

4. Consumers

Messaging, Analytics, Data Backup là các consumer độc lập không xử lý theo nhóm. Consumer sẽ sử dụng lệnh XREAD để đọc entry và xử lý.

5. VD sử dụng XREAD trong PHP xử lý thời gian thực.

Stream được sắp xếp tuần tự nên khi người dùng sử dụng XREAD và truyền vào ID của entry thì dữ liệu đầu ra sẽ lấy từ ID entry bạn nhập vào trở đi nó sẽ không bao gồm ID của entry bạn nhập.

$lastId = $;// Ta dụng có thể sử dụng 0 (xử lý từ đầu) hoặc $ (Xử lý từ thời điểm chạy)
while (true) {
    // Đọc dữ liệu từ stream thời gian thực
    $result = $redis->xRead(['test_stream' => $lastId], 100, 0, 10000); // BLOCK 10 giây, Tối đa 100 entry

    if ($result) {
        foreach ($result as $stream => $messages) {
            foreach ($messages as $id => $message) {
                //process message
                $lastId = $id; //gắn lại lastid để lấy các bản ghi tiếp theo chưa xử lý
            }
        }
    }
    
    //Đọc dữ liệu sau 1 khoảng thời gian không phải thời gian thực
    //sleep(5); //trường hợp bạn không muốn dùng BLOCK bạn có thể dùng sleep (Tùy nhu câu bài toán)
}

Ví dụ trên xử lý trường hợp mỗi entry chỉ chay duy nhất 1 lần. Nhìn thì ok nhưng vẫn còn rất nhiều trường hợp xảy ra khiến cho entry chưa được xử lý hoặc 1 entry xử lý nhiều lần. Ví dụ như Server die khi chạy lại ứng dụng thì $lastId sẽ bị reset thành 0 hoặc $. Đối với $lastId gán bằng 0 thì entry sẽ bị xử lý nhiều lần, còn đối với $ thì sẽ không xử lý được các entry trước đó. Để khăc phục các bạn có thể làm theo các cách sau: lưu lại $lastId xử lý cuối cùng của stream hoặc sử dụng XDEL xóa đi entry ID đã xử lý. Nhưng lưu ý vẫn có ngoại lệ, ngoại lệ ở đây là process xử lý OK rồi nhưng lưu lại $lastId hoặc thực thi lệnh XDEL gặp vấn đề.

Đối với sử dụng XREADGROUP nó sẽ khác với XREAD là có thể run được nhiều consumer xử lý các entry trong cùng một stream và cùng thời điểm mà không xảy ra trường hợp 1 entry bị xử lý nhiều lần. Các bạn có thể lên google tìm hiểu thêm về XREADGROUP.

V. Kết luận

Trên đây là cơ bản những gì mình tìm hiểu được về Redis Stream. Hy vọng bài viết có thể giúp chúng ta hiểu hơn về những gì mà Redis Stream có thể làm và các thông tin cơ bản về ưu điểm, hạn chế của nó từ đó chúng ta có thể xem xét cũng như lưa chọn sử dụng nó 1 cách phù hợp.

Cảm ơn các bạn đã đọc!

Related Posts