M1/M2 32GB RAM の Mac でベストなローカル LLM

ここ 2週間ほど、無料でローカルバイブコーディングができる OpenCode で使うのに最適なローカル LLM を探す沼にハマってました。で、苦労の末、ナウのベストを見つけました。というわけで、oMLX で量子化や設定の微調整を行い、ベンチマークで比較した結果を公開します。

すべて Mac Studio M2 Max / 32 GB RAM /30-Core GPU で試した内容なので、M1/M2 で 32 GB 以上の RAM を積んでいればそのまま使えると思います。CPU の世代と RAM のサイズによって、ご自身で最適な変更をしてみてください。

たとえば dtype の FP16 (float16) は M1/M2 に最適なので、M3 以降では BF16 にすればよいですし、量子化は 4bit が性能を保てる限界なので、大きな RAM を持っていて性能が欲しい人は、oQ レベルを上げましょう。もちろん oQ4 でコンテキストサイズを大きくするのもアリ。32 GB 未満の RAM の場合 Qwen3.5-9B oQ6 あたりなら動きそうですが、テストしていないのであしからず。oMLX なら実 RAM サイズに合うように大きめモデルを量子化できるので、その辺も書いてます。

実行環境: MacOS VRAM と oMLX の設定

MacOS は Tahoe です。GPU が使えるメモリのサイズを 26 GB に増やしています。永続化するコマンドは以下。その下の記事には細かいことを書いています。

Zsh
sudo touch /etc/sysctl.conf
sudo chown root:wheel /etc/sysctl.conf
sudo chmod 0644 /etc/sysctl.conf
echo "iogpu.wired_limit_mb=26624" >> /etc/sysctl.conf

oMLX は v0.3.12 Tahoe版 .dmg をダウンロードして Applications フォルダに入れて使っています。頻繁にアップデートされており、バージョンが違うと結果が異なる可能性もありえます。

v0.3.12 リリースページ (下の Assets の中に .dmg があります)

Qwen3.6 の oQ 量子化はなぜかこのバージョン v0.3.12 だと途中でコケるので、少し前のリリース候補 v0.3.9rc1 を使いました。量子化が必要なときは v0.3.12 を Quit し、マウントした v0.3.9rc1 の .dmg から直接 oMLX を立ち上げる、というやり方です。ま、そのうち修正されるでしょうけど。Gemma-4 の oQ 量子化は v0.3.12 でもできました。

v0.3.9rc1 リリースページ (Qwen3.6 の量子化がコケる場合はこちら)

oMLX の Global Settings でいくつか変更を行っています。太字はやったほうが良いと思います。パラメータは今後知見がたまると変更する可能性があります。速い外付け SSD はあったほうが良いでしょう。 表示が全部英語ですみません。

SERVER

  • Host: Open to all (0.0.0.0) ← LAN の他の PC からアクセスしたい場合
  • Port: 8585 ← 一例。他のサービスと被らなければなんでも OK

RESOURESE MAGEMENT

  • Memory guard: 有効にして Aggressive に変更
  • Hot Cache Limit (In-Memory Cache): 6% (2GB)
  • Cold Cache Limit (SSD Cache): 75% (715GB) ← これは SSD の空きに応じて設定してください
  • Max Concurrent Requests: 2
  • Idle Timeout: 1 hour

CACHE

  • Cache Enabled: 有効
  • SSD Cache Directory: 外付け SSD を指定 (例:/Volumes/SSD/oMLX/.omlx/cache)

GENERATION DEFAULTS

  • Max Context Window: 65536 ← これより大きいプロンプトだとはじくようにします
  • Max Tokens: 65536 ← 別の話ですが、Context Window と合わせてこれより小さいと OpenCode で Compaction が走りまくって先に進まないため、最低限これくらいは必要っぽいです

最後に Save Settings して、Restart server します。一部の設定を反映するためにはリスタートが必須です。

以下すべてはオリジナルモデルを 32 GB RAM の Mac で oMLX を使って量子化したものを使っています。また、ベンチマークは oMLX の Inteligence Benchimark のサブセットを実行した結果を添えています: HumanEval (30/164) MBPP (100/500)。これだけでも完了するまで数時間かかるので、誰かが実行したベンチマークを調べるのも良いでしょう:

Community Benchmarks https://omlx.ai/benchmarks

(oMLX でベンチマークが完了すると勝手にアップロードされます。ただし、TurboQuant を有効化していると、通常設定じゃないから、みたいなことでアップされません)

オススメモデルの紹介

というわけで、2026年 5月末現在の、ボク調べ、特徴別イケてるローカル LLM は以下となります。設定はコーディング寄りで、Vision の有り無しは速度への影響を比較した結果です。モデルサイズは、oMLX の Models > Manager に表示される値を持ってきています。

注意!モデル名に VL と入っているものは、自分がモデルを選ぶときに Vision 対応とわかるよう、書き出し後にフォルダ名を変更しているためです!編集ミスしそうなのであらかじめお詫びをしておき、モデル名はボクの環境のものをそのままコピペします。モデルが保存されているフォルダは Global Settings の Model Directories にあります (一般的な場所)。ここを開くコマンドはこちら:

Zsh
open ~/.cache/huggingface/hub/
最初に書いておきますが Gemma-4 は入っていません。一度もベンチマークを完遂できなかったからです。できたのは簡単なチャットくらい。また、Qwen シリーズを Claude Opus で再トレーニングし性能を爆上げしたというタイプもいくつか試したのですが、oMLX では期待したほどの結果は出ませんでした。

Qwen3.6-35B-A3B-VL-oQ4-fp16 /高速・高性能 (だがメモリ食い)

ソースモデル: Qwen/Qwen3.6-35B-A3B

oMLX での oQ 量子化設定:

  • OQ LEVEL: oQ4
  • Text Only: 無効 (Vision あり)
  • Preserve MTP weights: 無効 (MTP なし)
  • Non-quant weight dtype: float16

モデルサイズ: 19.67 GB

量子化完了後の Model Settings:

  • Preset: qwen3.5/6(r, code)
  • TEMPERATURE: 0.8
  • Enable Thinking: 有効
  • Chat Template Kwargs に追加
    • preserve_thinking
    • true
  • TurboQuant KV Cache: 有効
    • BITS PER CHANNEL: 4-bit

Intelligence Benchmark:

Model: Qwen3.6-35B-A3B-VL-oQ4-fp16
Benchmark         Accuracy   Correct   Total   Time(s)   Think
--------------------------------------------------------------
HUMANEVAL            93.3%        28      30     882.2     Yes
MBPP                 95.0%        95     100    2874.5     Yes

Performance Benchmark:

oMLX - LLM inference, optimized for your Mac
https://github.com/jundot/omlx
Benchmark Model: Qwen3.6-35B-A3B-VL-oQ4-fp16
================================================================================

Single Request Results
--------------------------------------------------------------------------------
Test                TTFT(ms)    TPOT(ms)        pp TPS        tg TPS      E2E(s)    Throughput    Peak Mem
pp1024/tg128          1258.3       12.78   813.8 tok/s    78.9 tok/s       2.881   399.9 tok/s    19.45 GB
pp4096/tg128          4331.4       12.89   945.7 tok/s    78.2 tok/s       5.969   707.7 tok/s    19.57 GB
pp8192/tg128          8698.0       13.55   941.8 tok/s    74.4 tok/s      10.419   798.6 tok/s    19.68 GB
pp16384/tg128        18472.8       14.65   886.9 tok/s    68.8 tok/s      20.333   812.1 tok/s    20.18 GB

Continuous Batching
pp1024 / tg128
--------------------------------------------------------------------------------
Batch           tg TPS   Speedup        pp TPS    pp TPS/req    TTFT(ms)      E2E(s)
1x          78.9 tok/s     1.00x   813.8 tok/s   813.8 tok/s      1258.3       2.881
2x         116.5 tok/s     1.48x   577.4 tok/s   288.7 tok/s      3418.6       5.744
4x         235.1 tok/s     2.98x   564.1 tok/s   141.0 tok/s      4778.1       9.438
8x         473.1 tok/s     6.00x   488.9 tok/s    61.1 tok/s      9522.4      18.921

ハイライト: Vision ありで書き出した方が速かった (MLX-VLM のが速い?)。MTP は MoE の高速化に寄与しなかったため削除。試した中では速度と性能がトップ。MBPP (Python のベンチマーク)で 95% はありがたい。実際に使うとメモリの使用量が常に 20 GB を超えるような状況が続くが、oMLX のメモリ管理が良くなってきて、落ちるとか Mac 自体のパニック再起動とかもほぼなし。当面は安定のベストモデル。メモリ確保に oQ3.5 で書き出したら性能は HE: 86.7% / MBPP: 83.0% に低下。だったら Qwen3.5 9B のがよい。

Qwen3.6-27B-MLX-oQ4-FP16 /メモリ使用量控えめで高性能 (だが遅い)

ソースモデル: Qwen/Qwen3.6-27B

oMLX での oQ 量子化設定:

  • OQ LEVEL: oQ4
  • Text Only: 有効 (Vision なし)
  • Preserve MTP weights: 有効 (MTP あり)
  • Non-quant weight dtype: float16

モデルサイズ: 15.21 GB

量子化完了後の Model Settings:

  • Preset: qwen3.5/6(r, code)
  • TEMPERATURE: 0.8
  • Enable Thinking: 有効
  • Chat Template Kwargs に追加
    • preserve_thinking
    • true
  • Native MTP: 有効

Intelligence Benchmark:

Model: Qwen3.6-27B-oQ4-fp16-mtp 
Benchmark         Accuracy   Correct   Total   Time(s)   Think
--------------------------------------------------------------
HUMANEVAL            90.0%        27      30    3877.4     Yes
MBPP                 92.0%        92     100   13400.2     Yes

Performance Benchmark:

oMLX - LLM inference, optimized for your Mac
https://github.com/jundot/omlx
Benchmark Model: Qwen3.6-27B-oQ4-fp16-mtp
================================================================================

Single Request Results
--------------------------------------------------------------------------------
Test                TTFT(ms)    TPOT(ms)        pp TPS        tg TPS      E2E(s)    Throughput    Peak Mem
pp1024/tg128          6725.0       41.87   152.3 tok/s    24.1 tok/s      12.042    95.7 tok/s    16.95 GB
pp4096/tg128         25971.6       42.54   157.7 tok/s    23.7 tok/s      31.374   134.6 tok/s    18.05 GB
pp8192/tg128         52299.5       45.66   156.6 tok/s    22.1 tok/s      58.098   143.2 tok/s    18.46 GB
pp16384/tg128       107967.0       51.96   151.8 tok/s    19.4 tok/s     114.565   144.1 tok/s    18.28 GB

Continuous Batching
pp1024 / tg128
--------------------------------------------------------------------------------
Batch           tg TPS   Speedup        pp TPS    pp TPS/req    TTFT(ms)      E2E(s)
1x          24.1 tok/s     1.00x   152.3 tok/s   152.3 tok/s      6725.0      12.042
2x          29.4 tok/s     1.22x   125.1 tok/s    62.5 tok/s     12619.3      25.091
4x          57.4 tok/s     2.38x   113.7 tok/s    28.4 tok/s     24562.2      44.939
8x         112.6 tok/s     4.67x   101.1 tok/s    12.6 tok/s     47096.9      90.086

ハイライト: MTP が高速化に寄与。モデル自体のサイズが 16 GB 程度なので、コンテキストサイズを大きめにできる。Native MTP を無効にし、TurboQuant を 4-bit などとする事で、多少の速度を犠牲にさらに大きなコンテキストが使えるのも強み。性能なら MoE より Dense でしょ!と当初 OpenCode で使ってましたが、将棋のタイトル戦を見ているような気分でした (良い手を指してくるけど毎回長考に入るので)。性能を求めて oQ5 で書き出しても 32 GB RAM で動きます。MBPP は驚異の 95% だけど 10% 以上遅くなるので正直キツい。

Qwen3.5-9B-VL-oQ8-fp16-mtp /十分な速度と極小使用メモリ (性能も OK)

ソースモデル: Qwen/Qwen3.5-9B

(Qwen3.6 シリーズは 27B が最小なので、ひとつ前の Qwen3.5)

oMLX での oQ 量子化設定:

  • OQ LEVEL: oQ8
  • Text Only: 無効 (Vision あり)
  • Preserve MTP weights: 有効 (MTP あり)
  • Non-quant weight dtype: float16

モデルサイズ: 10.00 GB

量子化完了後の Model Settings:

  • Preset: qwen3.5/6(r, code)
  • TEMPERATURE: 0.8
  • Enable Thinking: 有効
  • Chat Template Kwargs に追加
    • preserve_thinking
    • true
  • Native MTP: 有効

Intelligence Benchmark:

Model: Qwen3.5-9B-VL-oQ8-fp16-mtp
Benchmark         Accuracy   Correct   Total   Time(s)   Think
--------------------------------------------------------------
HUMANEVAL            90.0%        27      30    3094.9     Yes
MBPP                 82.0%        82     100     10340     Yes

Performance Benchmark:

oMLX - LLM inference, optimized for your Mac
https://github.com/jundot/omlx
Benchmark Model: Qwen3.5-9B-VL-oQ8-fp16-mtp
================================================================================

Single Request Results
--------------------------------------------------------------------------------
Test                TTFT(ms)    TPOT(ms)        pp TPS        tg TPS      E2E(s)    Throughput    Peak Mem
pp1024/tg128          2053.9       20.43   498.6 tok/s    49.3 tok/s       4.649   247.8 tok/s    10.18 GB
pp4096/tg128          7411.8       21.15   552.6 tok/s    47.7 tok/s      10.098   418.3 tok/s    10.86 GB
pp8192/tg128         14856.2       22.19   551.4 tok/s    45.4 tok/s      17.674   470.8 tok/s    11.41 GB
pp16384/tg128        30293.1       23.44   540.8 tok/s    43.0 tok/s      33.270   496.3 tok/s    12.29 GB

Continuous Batching
pp1024 / tg128
--------------------------------------------------------------------------------
Batch           tg TPS   Speedup        pp TPS    pp TPS/req    TTFT(ms)      E2E(s)
1x          49.3 tok/s     1.00x   498.6 tok/s   498.6 tok/s      2053.9       4.649
2x          68.0 tok/s     1.38x   445.2 tok/s   222.6 tok/s      4454.3       8.367
4x         135.5 tok/s     2.75x   337.1 tok/s    84.3 tok/s      8013.8      15.928
8x         268.7 tok/s     5.45x   293.4 tok/s    36.7 tok/s     15931.5      31.730

ハイライト: Vision あり・なしでの有意な違いはない。50トークン/秒近いスピードで、メモリも 10 GB とちょっとしか使わない。MPBB (Python) 82% で構わない用途なら全然あり。Vision ありで書き出すなら、Enable Thinking を無効にして大量の画像のインデックスを作らせるとか。逆に Vision と MTP を外して書き出し、利用できるメモリを最大にする/小さな RAM で動かす、というのもあり。別のモデルですが、Qwopus3.5-9B-v3 は MBPP が 85% に微増。

あとがき

32 GB RAM の Mac だと、大きなコンテキスト (大きなコードや文章) を扱わないのであれば、35B-A3B がベストだと思います。バイブコーディングをしているときなどは、コードが大きくなってメモリに収まらなくなってきたら 27B や 9B に切り替える感じでしょうか。OpenCode でテトリスくらいのものを作るなら 35B-A3B が速くて性能も良くて、なかなか快適でした。https://artificialanalysis.ai/ で比較すると Coding Index は 27B のが上なんですけどね。

とりあえず色々とスッキリしたので、しばらくはバイブコーディングに時間を使おうと思います。

oMLX についてもうちょっと

ここ数日はずっと oMLX をいじっています。oMLX とは Ollama や LM Studio のように、Mac でローカル LLM を実行する環境ですが、めっちゃいいです。GUI が web ブラウザで開ける、GUI からサーバが再起動ができる、モデルのダウンロードや oQ 量子化が簡単にできる、M1/M2 チップで速く動く FP16 へも変換できる、Vision や MTP 機能を外して少し小さくできる、各種ベンチマークが簡単に実行・共有できる、Claude Code / Codex / OpenClaw / OpenCode 等コーディングエージェントの LLM 実行環境として使える、KV キャッシュを外付け SSD 等に置ける、更新が頻繁で新しい技術が早く試せる、等など。ボクは Ollama と Open WebUI の自動起動をやめ、別の Mac で動いている Dify を使うこともなくなりました。

oLMX による oQ 量子化と FP16

oMLX は、量子化などがされる前のモデルを oQ 量子化することができます。元のサイズは Mac の RAM に乗り切る必要はありません。なので、元の LLM 自体のファイルサイズが 60 GB を超えるくらいでも、がんばってダウンロードして oQ4 (4 bit) 量子化してしまえば 20 GB 以下にはなります。つまり、32 GB RAM の Mac であれば設定で GPU に RAM を 24~26 GB くらい割り当てられるので、動かすことができちゃうんです。

さらに oMLX で量子化する際には、M1/M2 向けに FP16 (float16) という精度も選べます。M1/M2 という初期の Apple Silicon は、現在主流の BF16 (bfloat16) には対応しておらず同精度で書き出された LLM だと性能が発揮できません。もし、Hugging Face に FP16 で書き出されたお目当てのモデルが見つからなくても、自分で簡単に最適化して使うことができるのです。

Redit なんかを見てると、M1/M2 なら FP16 が n% 速いとか、MoE に MTP は効果がない等の既存モデルにも有益な情報がちょいちょいあがってきて助かります。ダウンロードが一番時間がかかるため、量子化前の Qwen3.6 ソースモデルを手元に置いておき、いつでも書き出せるようにしています。ベンチマークのおかげで、書き出し後に残すか消すかの判断がすぐできるのも oMLX のよいところですね。

作者さんが FP16版を HF に置いてくれてます

oMLX の作者の Jun Kim さん (Jundot) や、Issue で FP16 サポートのリクエストをしてくれた Kir Belevich さん (deepsweet) の Hugging Face リポジトリには、FP16 で oQ 量子化された新しめ&有名どころの複数バリアントがアップされています。よって、書き出しが面倒、SSD に空きがない、という方は、ここからダウンロードして使うのが手っ取り早くてオススメです。

Image by Stable Diffusion (Mochi Diffusion) 

いまだにこんな古い画像生成 AI を使ってますが、いかんせんお手軽で良いです。ともあれ今回のキャッチ画像は、記事の内容に合わせて「表彰台に乗った 3人の陸上選手」をイメージ。いつものごとく最小限のプロンプトで出たとこ勝負したのですが、なんと一位の選手のゼッケンが AI。ボクが大好きなバンド、ハク。の Gt. Vo. もあいちゃんなので、即採用となりました。あ、ビジュは全然違います。

Date:
2026年5月30日 23:08:30

Model:
realisticVision-v51VAE_original_768x512_cn

Size:
768 x 512

Include in Image:
top 3 track and field athletes on the podium

Exclude from Image:

Seed:
3829489016

Steps:
21

Guidance Scale:
20.0

Scheduler:
DPM-Solver++

ML Compute Unit:
CPU & GPU

gpt-oss の Harmony Response Format を適切に処理する MLX-LM 用プロキシサーバをご紹介

MLX-LM が v0.30.4 で Nemotron をサポートしたということでアップデートしました。そうしたら予想通り gpt-oss のマルチターンチャットができなくなりました。MLX-LM として OpenAI のHarmony Response Format には絶対対応しないつもりみたいっすね。

以前は直接server.pyスクリプトを魔改造をして動くようにしましたが、アップデートのたびに毎回そうもやってられないので、プロキシサーバを書いて mlx_lm.server の手前に立てることにしました。<think></think>タブを思考部分として隠す LLM クライアント (Dify、Open WebUI、LibreChat、など) で適切に gpt-oss が使えるようになります。思考部分が表示されるクライアントでも、マルチターンチャットができます。

GitHub リポジトリはこちら

https://github.com/tokyohandsome/mlx-lm-harmony-proxy

GitHub の説明文には英語しかないので、日本語の使い方を簡単に書いておきます。

プロキシサーバの立ち上げ方

まずはリポジトリをクローンしてください。

その後.env.example.envとしてコピーし、既存の MLX-LM サーバの IP アドレスとポートに書き換えてください。192.168.100.100:8585 ならこんな感じです。PROXY_PORTは今回立てるプロキシサーバのポート番号ですので、未使用のものを指定してください (同一サーバ上なら 8484 とか)。

.env
MLX_SERVER_URL=http://192.168.100.100:8585/v1
PROXY_PORT=8585

Docker が動いている環境であれば、docker compose up -d --buildでプロキシサーバが立ち上がります。

Docker を使わない場合は、Python の仮想環境にfastapi uvicorn httpx python-dotenvをインストールし、python main.pyで立ち上げます。

LLM クライアントからの接続

無事に立ち上がったら、Dify や Open WebUI 等のクライアントで LLM プロバイダアドレスをプロキシサーバの IP とポートにしてあげるだけです。同一サーバ上であれば、http://192.168.100.100:8484/v1 に変える、みたいなことです。

何をしているのか

gpt-oss からクライアントへのレスポンスにある<|channel|>analysis<|message|><think>に、<|end|><|start|>assistant<|channel|>final<|message|></think>に書き換えます。最近の LLM クライアントは<think></think>の間を思考部分としてチャット画面から隠し、マルチターンの時には LLM に送らないのでチャットが継続できます。

あと Dify でモデルを追加する時には Stream すると失敗するので、そのときはStream: Falseで対応するようにしてあります。

どうしてこれを作ったのか

っていうか、Dify + MLX-LM だと gpt-oss をまともに動かせないので、逆に皆さんどうしてるんでしょうか。うまい方法があるのだろうかと調べるたびに自分の過去のブログが出てきて、欲しい情報が見つからないので作りました。いまだに 32B パラメータ以下ぐらいのクラスだと gpt-oss-20B の性能は高いと思いますけどね。

次回予告?

話は変わりますが Dify の Community Edition って無料で使えてとても良いんですけど、ブラウザのキャッシュをクリアしたら履歴が見れなくなるし、URL がわかれば誰でもアクセスできちゃうしで、会社や複数ユーザでちゃんと使うのには適していません。

ということで、ユーザ認証&セッション管理用に LibreChat をインストールしてみました。立ち上げるのも Dify との連携も面倒ですが、Open WebUI よりレンダリングが速くて HTML タグ等を解釈・描画せずにちゃんと文字として表示してくれたりで、なかなか良いです。ただ、思考部分が長いと?描画がクラッシュするっぽく、そこが解決できたら設定の仕方を記事にしてもいいな、と思い、絶賛いじり中です (2026/02/03 追記) チャット中の画面がクラッシュする原因は LibreChat じゃなくて、MLX or MLX-LM が Devstral-Small-2 とか Nemotron-3 Nano とかに対応し切れていないから、もしくはパラメータの調整がうまくいっていないから、はたまた MLX への変換がうまくいっていないから、という感じがしてきました。

Image by Stable Diffusion (Mochi Diffusion)

電話で情報を受けるプロの代理人、みたいなのを期待してそれっぽいプロンプトを入力。”young” を入れる前はもう少し年齢層が上で、いくらやっても主に目が破綻していたのですが、年齢層を低くしたら安定しました。学習に使った画像の枚数差?

Date:
2026年2月2日 0:11:10

Model:
realisticVision-v51VAE_original_768x512_cn

Size:
768 x 512

Include in Image:
a proxy young lady talking over deskphone in a clean and professional looking room

Exclude from Image:

Seed:
3264770378

Steps:
24

Guidance Scale:
20.0

Scheduler:
DPM-Solver++

ML Compute Unit:
CPU & GPU

Mac の「ミュージック」アプリが曲を 31秒で停止する不具合 (と AirPlay のエラー -71974) を直した話

のっけからあれですが、OS に最初から付いているアプリだからってあまりにも一般的な名称をアプリに付けるのってどうなんすかすね。今回ハマったのは Mac の「ミュージック」アプリ。ウェブブラウザは「Safari」なんだし「iTunes」のままでよかったんじゃないのかな、と。買い切りストアの名称は「iTunes Store」なんだし。あ、でもサブスクは「Apple Music」なんだ?はぁ?どういうこと?

って、簡単に解決しなかったのでちょっとムカついて名前に悪態付いている、というわけではないですよ。純粋に疑問に思ったので。普段は気にならないんですけどね。

不具合の内容

  • Mac の「ミュージック」アプリでローカルに保存済みの曲を再生すると、必ず 31秒で停止 (ポーズ) する
  • そこから続けて再生できるが、31秒後の 1分 2秒でまた止まる
  • エラー表示はない
  • 再生には 3.5mm ケーブルで接続したスピーカーを使用しており、Amazon Prime ビデオや YouTube の再生は全く問題なし

トラブルシューティングステップ

  • まずネットを検索。必ず 31秒で止まるなんて絶対何か有名な不具合に違いないと思ったが、「15秒で止まる」というのはいくつかあるものの 31秒はなかった。いくつかめぼしい手順を試す ↓
  • ミュージックアプリのアカウントメニューからサインアウト、サインイン
  • 同アカウントメニューからアカウント設定に入り、認証済みデバイス 5つを全消し
  • 同アカウント設定のデバイスの管理から、もう使っていない iPhone 等を削除
  • ミュージックアプリの終了
  • macOS の再起動

iTunes Store での試し聴きが最長 31秒だったかも?同ストアで購入した曲で発生している不具合なので、アカウント認証に失敗して全部聴かせてくれないのかも?などとアカウントあたりを盛大に疑ってごちゃごちゃってました。

決定打と解決

macOS の再起動後に曲を再生すると、また 31秒後に止まりました。ため息をつきかけたところでふと画面にエラーポップアップが表示されていることに気がつきました。スクリーンショットを撮っていないので正確な文言は覚えていないんですが、「Bluetooth ヘッドフォンが見つかりません」または「〜接続できません」というメッセージと、二つのボタン「停止」(違うかも?) と「キャンセル」が表示されていました。「Bluetooth ヘッドフォン」の部分は macOS で認識済みのデバイスで、具体的には「soundcore Space Q45」が表示されていました。

すでに曲は止まっているので、停止をクリックするとポップアップが消えるだけ。スペースバーで再生を続けると 31秒後にまた曲が止まってポップアップ表示。で、2回目はキャンセルをクリックしたところ自動的にスピーカーから再生が再開し、その後止まることはありませんでした。はい、解決です

キャンセルをクリックしたとき若干音質が変わった (イコライザの設定が無効になったような、ボリュームが絞られたような) 感じがしましたが、設定上はそういうこともなかったので、これは気のせいかも知れません。ともかく、再生が途中で止まることはなくなりました。

何が起こっていたのだろうか?

ここに書くことはあくまでも想像です。

最後にミュージックアプリで曲を再生していたときは Bluetooh ヘッドセットを使っていて、曲が流れている時にヘッドセットの電源を落としたような覚えがあります。ミュージックアプリが 31秒で曲を止めていたときは、前回使用した Bluetooth デバイスに音声データを送り続けているつもりで、でも実際は macOS が認識をしていないのでタイムアウトが発生していたのではないかと思います (31秒って長いですけど)。エラーのポップアップが出なかったのもバグだと思いますが、もしかしたら他のウィンドウに隠れていたのかも知れません。いずれにせよバグか設計ミスですね。

で、再起動またはその他諸々行ったことが功を奏しミュージックアプリはポップアップを表示できるようになり、ユーザによるキャンセルによって接続されていない Bluetooth デバイスにデータを送り続ける (と 31秒毎に思い至る) という呪縛から解き放たれた、そんな感じじゃないでしょうか。「停止」 (じゃないかも) は、ユーザが Bluetooth デバイスを再接続するのを待つためのボタンで、その状態でまた再生を行うと 31秒後にタイムアウトする、と。

証明しようと曲の再生中にヘッドセットの電源を落とすというのを何度か試したのですが再現できませんでした。タイミングがシビアなタイプのバグのようです (プレステタイトルのデバッグバイト時代を思い出す)。

iPhone でも起こっていた別の不具合

紛らわしいのですが、iPhone の「ミュージック」アプリでも最近不具合が発生していました。iPhone で再生中の曲を AirPlay を使って Mac Studio につないだスピーカから聴くことがあるのですが、ここ最近は数秒再生すると止まってしまっていたのです。面倒ながらも Mac に入って再生すればよいのでとりあえず放置していたのですが、今回の「ミュージックアプリが再生を途中で止める」という不具合が重なったことで余計な詮索を生み、結果解決まで時間がかかった気がします。

ところでこれまで気づかなかったのですが iPhone は画面にちゃんとエラー ( ↓ ) が出ていて、キャプチャできました。このエラー「-71974」の解決方法は、iPhone 本体の再起動です。なので Mac のミュージックアプリの不具合とは無関係でしょう。検索できるように画像から文字をコピペ:

操作を完了できませんでした
原因不明のエラーが起きました (-71974)
ハク。「それしか言えない」(C) TOY’S FACTORY。自分用に歌詞に「ヨ」入れているのは怒られないと思いたい

お小遣い稼ぎに本件のきっかけとなったヘッドセットをご紹介

ノイズキャンセリング性能の高さとバッテリの強さでコストパフォーマンス最高な Anker の Bluetooth ヘッドセット「soundcore Space Q45」。Amazon の各種セールの時はもちろん、何でもないときも謎な割引があって、大体一万円ちょっとで買えます。電車内で音楽を聴くのもヨシ、データセンタで作業するときのノイキャンにもヨシ、映画や YouTube なんかでも遅延は全くないし、なんなら環境音が自分の周りから聞こえてくるように聞こえてふり返ってしまうこともることもあるくらいで、とてもヨイです。

ソースは二つ登録しておけるのでボクは Mac と iPhone で登録していますが、出かけているときはすんなり iPhone につながってくれます。なんと 3.5mm ピンのケーブルもつながります。

モスキート音やネズミよけの高音がほぼ聞こえないボクの様なラッキーな音域をお持ちの方には特にオススメですよ!

Image by Stable Diffusion (Mochi Diffusion)

「Apple Music のバグ」的な画像を作って欲しかったんだけど、リアルな虫はキモいので水彩にしたところ逆に虫が全く出ず、”music”ったって AI も音符を描くのがせいぜいで、「何か」が生まれるのを期待してとにかく大量に生成。その中で “CD っぽいもの” を描くものが出てきたので、なるほどな、とそっちにふって “bug” も “broken” にしてみたりしたらこんなのができました。結構オキニ

Date:
2025年10月5日 0:33:59

Model:
realisticVision-v51VAE_original_768x512_cn

Size:
768 x 512

Include in Image:
watercolors apple broken compact disc

Exclude from Image:

Seed:
609858026

Steps:
20

Guidance Scale:
20.0

Scheduler:
DPM-Solver++

ML Compute Unit:
CPU & GPU

gpt-oss の Reasoning レベル変更を MLX-LM でもできるようにする魔改造

gpt-oss が出てからというもの、MLX-LM サーバの改造を続けていますが、今回は Dify や Open WebUI で Reasoning レベル (High、Middle、Low) の指定ができるようにしました。だってシステムプロンプトに入れようが Open WebUI のカスタムパラメータとして指定しようが全然反映されなかったので。

Reasoning の値を gpt-oss へ渡すのは MLX-LM の仕事であるはずなので、そういう意味ではあるべき姿に矯正したと言えなくもないです (エラそう)。ただ今回も、「ムリヤリだが動けばいい」状態なので、中の人のお手間を取らせる PR とかは恥ずかしくてできません。

今回のコードはこれまでの改造をベースにしています。過去の改造記事のリンクを以下に置いておきますので未読の方はぜひどうぞ。

↓ 回答が途中で止まる、2回目以降のターンがエラーになる、の二つに対処:

↓ 回答の前の思考部分を <details> タグで隠す (改造は主に Dify 向け):

注意事項的な

本来クライアント側で行うべき事とサーバ側で行うべき事を全てサーバで処理しています。とりあえず動くようになったコードだけをこのページに置いておきますので、どうぞ必要な方だけ、見たりパクったりそのまま使ったりしてください。他のモデルに悪影響がある場合は、ポートを指定して別サーバとして立てるなどしてください (gpt-oss 20b 以外では、Qwen3 30B A3B Thinking でのみテスト済み)。

ご自身の環境で使う場合は、必ず元のserver.pyをコピーしておいてください。

コードはこちら

★ 改造済みスクリプト server.py 全てを見るにはここをクリック ★ (1247行あります)
# Copyright © 2023-2024 Apple Inc.

import argparse
import json
import logging
import platform
import socket
import time
import uuid
import warnings
from dataclasses import dataclass, field
from http.server import BaseHTTPRequestHandler, HTTPServer
from pathlib import Path
from typing import (
    Any,
    Dict,
    List,
    Literal,
    NamedTuple,
    Optional,
    Sequence,
    Tuple,
    Union,
)

import mlx.core as mx
from huggingface_hub import scan_cache_dir

from ._version import __version__
from .generate import stream_generate
from .models.cache import can_trim_prompt_cache, make_prompt_cache, trim_prompt_cache
from .sample_utils import make_logits_processors, make_sampler
from .utils import common_prefix_len, load

# --- ここから追加 ---
import re
# -- ここまで ---


def get_system_fingerprint():
    gpu_arch = mx.metal.device_info()["architecture"] if mx.metal.is_available() else ""
    return f"{__version__}-{mx.__version__}-{platform.platform()}-{gpu_arch}"


class StopCondition(NamedTuple):
    stop_met: bool
    trim_length: int


def stopping_criteria(
    tokens: List[int],
    stop_id_sequences: List[List[int]],
    eos_token_id: Union[int, None],
) -> StopCondition:
    """
    Determines whether the token generation should stop based on predefined
    conditions.

    Args:
        tokens (List[int]): The current sequence of generated tokens.
        stop_id_sequences (List[List[[int]]): A list of integer lists, each
          representing a sequence of token IDs. If the end of the `tokens`
          list matches any of these sequences, the generation should stop.
        eos_token_id (Union[int, None]): The token ID that represents the
          end-of-sequence. If the last token in `tokens` matches this, the
          generation should stop.

    Returns:
        StopCondition: A named tuple indicating whether the stop condition has
          been met (`stop_met`) and how many tokens should be trimmed from the
          end if it has (`trim_length`).
    """
    if tokens and tokens[-1] == eos_token_id:
        return StopCondition(stop_met=True, trim_length=0)

    for stop_ids in stop_id_sequences:
        if len(tokens) >= len(stop_ids):
            if tokens[-len(stop_ids) :] == stop_ids:
                return StopCondition(stop_met=True, trim_length=len(stop_ids))

    return StopCondition(stop_met=False, trim_length=0)


def sequence_overlap(s1: Sequence, s2: Sequence) -> bool:
    """
    Checks if a suffix of s1 has overlap with a prefix of s2

    Args:
        s1 (Sequence): The first sequence
        s2 (Sequence): The second sequence

    Returns:
        bool: If the two sequences have overlap
    """
    max_overlap = min(len(s1), len(s2))
    return any(s1[-i:] == s2[:i] for i in range(1, max_overlap + 1))


def convert_chat(messages: List[dict], role_mapping: Optional[dict] = None):
    default_role_mapping = {
        "system_prompt": (
            "A chat between a curious user and an artificial intelligence "
            "assistant. The assistant follows the given rules no matter what."
        ),
        "system": "ASSISTANT's RULE: ",
        "user": "USER: ",
        "assistant": "ASSISTANT: ",
        "stop": "\n",
    }
    role_mapping = role_mapping if role_mapping is not None else default_role_mapping

    prompt = ""
    for line in messages:
        role_prefix = role_mapping.get(line["role"], "")
        stop = role_mapping.get("stop", "")
        content = line.get("content", "")
        prompt += f"{role_prefix}{content}{stop}"

    prompt += role_mapping.get("assistant", "")
    return prompt.rstrip()


def process_message_content(messages):
    """
    Convert message content to a format suitable for `apply_chat_template`.

    The function operates on messages in place. It converts the 'content' field
    to a string instead of a list of text fragments.

    Args:
        message_list (list): A list of dictionaries, where each dictionary may
          have a 'content' key containing a list of dictionaries with 'type' and
          'text' keys.

    Raises:
        ValueError: If the 'content' type is not supported or if 'text' is missing.

    """
    for message in messages:
        content = message["content"]
        if isinstance(content, list):
            text_fragments = [
                fragment["text"] for fragment in content if fragment["type"] == "text"
            ]
            if len(text_fragments) != len(content):
                raise ValueError("Only 'text' content type is supported.")
            message["content"] = "".join(text_fragments)
        elif content is None:
            message["content"] = ""


@dataclass
class PromptCache:
    cache: List[Any] = field(default_factory=list)
    model_key: Tuple[str, Optional[str]] = ("", None, None)
    tokens: List[int] = field(default_factory=list)


class ModelProvider:
    def __init__(self, cli_args: argparse.Namespace):
        """Load models on demand and persist them across the whole process."""
        self.cli_args = cli_args
        self.model_key = None
        self.model = None
        self.tokenizer = None
        self.draft_model = None

        # Preload the default model if it is provided
        self.default_model_map = {}
        if self.cli_args.model is not None:
            self.default_model_map[self.cli_args.model] = "default_model"
            self.load(self.cli_args.model, draft_model_path="default_model")

    def _validate_model_path(self, model_path: str):
        model_path = Path(model_path)
        if model_path.exists() and not model_path.is_relative_to(Path.cwd()):
            raise RuntimeError(
                "Local models must be relative to the current working dir."
            )

    # Added in adapter_path to load dynamically
    def load(self, model_path, adapter_path=None, draft_model_path=None):
        model_path, adapter_path, draft_model_path = map(
            lambda s: s.lower() if s else None,
            (model_path, adapter_path, draft_model_path),
        )

        model_path = self.default_model_map.get(model_path, model_path)
        if self.model_key == (model_path, adapter_path, draft_model_path):
            return self.model, self.tokenizer

        # Remove the old model if it exists.
        self.model = None
        self.tokenizer = None
        self.model_key = None
        self.draft_model = None

        # Building tokenizer_config
        tokenizer_config = {
            "trust_remote_code": True if self.cli_args.trust_remote_code else None
        }
        if self.cli_args.chat_template:
            tokenizer_config["chat_template"] = self.cli_args.chat_template

        if model_path == "default_model":
            if self.cli_args.model is None:
                raise ValueError(
                    "A model path has to be given as a CLI "
                    "argument or in the HTTP request"
                )
            adapter_path = adapter_path or self.cli_args.adapter_path
            model, tokenizer = load(
                self.cli_args.model,
                adapter_path=adapter_path,
                tokenizer_config=tokenizer_config,
            )
        else:
            self._validate_model_path(model_path)
            model, tokenizer = load(
                model_path, adapter_path=adapter_path, tokenizer_config=tokenizer_config
            )

        if self.cli_args.use_default_chat_template:
            if tokenizer.chat_template is None:
                tokenizer.chat_template = tokenizer.default_chat_template

        self.model_key = (model_path, adapter_path, draft_model_path)
        self.model = model
        self.tokenizer = tokenizer

        def validate_draft_tokenizer(draft_tokenizer):
            # Check if tokenizers are compatible
            if draft_tokenizer.vocab_size != tokenizer.vocab_size:
                logging.warning(
                    "Draft model tokenizer does not match model tokenizer. "
                    "Speculative decoding may not work as expected."
                )

        # Load draft model if specified
        if (
            draft_model_path == "default_model"
            and self.cli_args.draft_model is not None
        ):
            self.draft_model, draft_tokenizer = load(self.cli_args.draft_model)
            validate_draft_tokenizer(draft_tokenizer)

        elif draft_model_path is not None and draft_model_path != "default_model":
            self._validate_model_path(draft_model_path)
            self.draft_model, draft_tokenizer = load(draft_model_path)
            validate_draft_tokenizer(draft_tokenizer)
        return self.model, self.tokenizer


class APIHandler(BaseHTTPRequestHandler):
    def __init__(
        self,
        model_provider: ModelProvider,
        *args,
        prompt_cache: Optional[PromptCache] = None,
        system_fingerprint: Optional[str] = None,
        **kwargs,
    ):
        """
        Create static request specific metadata
        """
        self.created = int(time.time())
        self.model_provider = model_provider
        self.prompt_cache = prompt_cache or PromptCache()
        self.system_fingerprint = system_fingerprint or get_system_fingerprint()

        # --- ここから追加 ---
        self.reasoning_effort = "medium"
        # --- ここまで ---

        super().__init__(*args, **kwargs)

    def _set_cors_headers(self):
        self.send_header("Access-Control-Allow-Origin", "*")
        self.send_header("Access-Control-Allow-Methods", "*")
        self.send_header("Access-Control-Allow-Headers", "*")

    def _set_completion_headers(self, status_code: int = 200):
        self.send_response(status_code)
        self.send_header("Content-type", "application/json")
        self._set_cors_headers()

    def _set_stream_headers(self, status_code: int = 200):
        self.send_response(status_code)
        self.send_header("Content-type", "text/event-stream")
        self.send_header("Cache-Control", "no-cache")
        self._set_cors_headers()

    def do_OPTIONS(self):
        self._set_completion_headers(204)
        self.end_headers()

    def do_POST(self):
        """
        Respond to a POST request from a client.
        """
        endpoints = {
            "/v1/completions": self.handle_text_completions,
            "/v1/chat/completions": self.handle_chat_completions,
            "/chat/completions": self.handle_chat_completions,
        }

        if self.path not in endpoints:
            self._set_completion_headers(404)
            self.end_headers()
            self.wfile.write(b"Not Found")
            return

        # Fetch and parse request body
        content_length = int(self.headers["Content-Length"])
        raw_body = self.rfile.read(content_length)

        try:
            self.body = json.loads(raw_body.decode())
        except json.JSONDecodeError as e:
            logging.error(f"JSONDecodeError: {e} - Raw body: {raw_body.decode()}")
            # Set appropriate headers based on streaming requirement
            if self.stream:
                self._set_stream_headers(400)
                self.wfile.write(
                    f"data: {json.dumps({'error': f'Invalid JSON in request body: {e}'})}\n\n".encode()
                )
            else:
                self._set_completion_headers(400)
                self.wfile.write(
                    json.dumps({"error": f"Invalid JSON in request body: {e}"}).encode()
                )
            return

        indent = "\t"  # Backslashes can't be inside of f-strings
        logging.debug(f"Incoming Request Body: {json.dumps(self.body, indent=indent)}")
        assert isinstance(
            self.body, dict
        ), f"Request should be dict, but got {type(self.body)}"

        # Extract request parameters from the body
        self.stream = self.body.get("stream", False)
        self.stream_options = self.body.get("stream_options", None)
        self.requested_model = self.body.get("model", "default_model")
        self.requested_draft_model = self.body.get("draft_model", "default_model")
        self.num_draft_tokens = self.body.get(
            "num_draft_tokens", self.model_provider.cli_args.num_draft_tokens
        )
        self.adapter = self.body.get("adapters", None)
        self.max_tokens = self.body.get("max_completion_tokens", None)
        if self.max_tokens is None:
            self.max_tokens = self.body.get(
                "max_tokens", self.model_provider.cli_args.max_tokens
            )
        self.temperature = self.body.get(
            "temperature", self.model_provider.cli_args.temp
        )
        self.top_p = self.body.get("top_p", self.model_provider.cli_args.top_p)
        self.top_k = self.body.get("top_k", self.model_provider.cli_args.top_k)
        self.min_p = self.body.get("min_p", self.model_provider.cli_args.min_p)
        self.repetition_penalty = self.body.get("repetition_penalty", 1.0)
        self.repetition_context_size = self.body.get("repetition_context_size", 20)
        # --- ここから追加 ---
        requested_effort = self.body.get("reasoning_effort", None)
        if requested_effort is not None:
            self.reasoning_effort = requested_effort
        # --- ここまで ---

        self.xtc_probability = self.body.get("xtc_probability", 0.0)
        self.xtc_threshold = self.body.get("xtc_threshold", 0.0)
        self.logit_bias = self.body.get("logit_bias", None)
        self.logprobs = self.body.get("logprobs", -1)
        self.validate_model_parameters()
        # Load the model if needed
        try:
            self.model, self.tokenizer = self.model_provider.load(
                self.requested_model,
                self.adapter,
                self.requested_draft_model,
            )
        except:
            self._set_completion_headers(404)
            self.end_headers()
            self.wfile.write(b"Not Found")
            return

        # Get stop id sequences, if provided
        stop_words = self.body.get("stop")
        stop_words = stop_words or []
        stop_words = [stop_words] if isinstance(stop_words, str) else stop_words
        stop_id_sequences = [
            self.tokenizer.encode(stop_word, add_special_tokens=False)
            for stop_word in stop_words
        ]

        # Send header type
        (
            self._set_stream_headers(200)
            if self.stream
            else self._set_completion_headers(200)
        )

        # --- ここから追加 ---
        requested_effort = self.body.get("reasoning_effort", None)
        if requested_effort is not None:
            self.reasoning_effort = requested_effort

        # --- ここまで ---
        
        # Call endpoint specific method
        prompt = endpoints[self.path]()
        self.handle_completion(prompt, stop_id_sequences)

    def validate_model_parameters(self):
        """
        Validate the model parameters passed in the request for the correct types and values.
        """
        if not isinstance(self.stream, bool):
            raise ValueError("stream must be a boolean")

        if not isinstance(self.max_tokens, int) or self.max_tokens < 0:
            raise ValueError("max_tokens must be a non-negative integer")

        if not isinstance(self.temperature, (float, int)) or self.temperature < 0:
            raise ValueError("temperature must be a non-negative float")

        if not isinstance(self.top_p, (float, int)) or self.top_p < 0 or self.top_p > 1:
            raise ValueError("top_p must be a float between 0 and 1")

        if not isinstance(self.top_k, int) or self.top_k < 0:
            raise ValueError("top_k must be a non-negative integer")

        if not isinstance(self.min_p, (float, int)) or self.min_p < 0 or self.min_p > 1:
            raise ValueError("min_p must be a float between 0 and 1")

        if not isinstance(self.num_draft_tokens, int) or self.num_draft_tokens < 0:
            raise ValueError("num_draft_tokens must be a non-negative integer")

        if (
            not isinstance(self.repetition_penalty, (float, int))
            or self.repetition_penalty < 0
        ):
            raise ValueError("repetition_penalty must be a non-negative float")

        if self.logprobs != -1 and not (0 < self.logprobs <= 10):
            raise ValueError(
                f"logprobs must be between 1 and 10 but got {self.logprobs:,}"
            )

        if (
            not isinstance(self.repetition_context_size, int)
            or self.repetition_context_size < 0
        ):
            raise ValueError("repetition_context_size must be a non-negative integer")

        if self.logit_bias is not None:
            if not isinstance(self.logit_bias, dict):
                raise ValueError("logit_bias must be a dict of int to float")

            try:
                self.logit_bias = {int(k): v for k, v in self.logit_bias.items()}
            except ValueError:
                raise ValueError("logit_bias must be a dict of int to float")
        if not (
            isinstance(self.xtc_probability, float)
            and 0.00 <= self.xtc_probability <= 1.00
        ):
            raise ValueError(f"xtc_probability must be a float between 0.00 and 1.00")
        if not (
            isinstance(self.xtc_threshold, float) and 0.00 <= self.xtc_threshold <= 0.50
        ):
            raise ValueError(f"xtc_threshold must be a float between 0.00 and 0.5")
        if not isinstance(self.requested_model, str):
            raise ValueError("model must be a string")
        if self.adapter is not None and not isinstance(self.adapter, str):
            raise ValueError("adapter must be a string")

        # --- ここから追加 ---
        if self.reasoning_effort is not None:
            valid_efforts = ["low", "medium", "high"]
            if not isinstance(self.reasoning_effort, str) or self.reasoning_effort.lower() not in valid_efforts:
                logging.warning(f"Invalid value '{self.reasoning_effort}' for reasoning_effort. Defaulting to 'medium'.")
                self.reasoning_effort = "medium"
            else:
                self.reasoning_effort = self.reasoning_effort.lower() # 一貫性のために小文字に変換
        # --- ここまで ---

    def generate_response(
        self,
        text: str,
        finish_reason: Union[Literal["length", "stop"], None],
        prompt_token_count: Optional[int] = None,
        completion_token_count: Optional[int] = None,
        token_logprobs: Optional[List[float]] = None,
        top_tokens: Optional[List[Dict[int, float]]] = None,
        tokens: Optional[List[int]] = None,
        tool_calls: Optional[List[str]] = None,
    ) -> dict:
        """
        Generate a single response packet based on response type (stream or
        not), completion type and parameters.

        Args:
            text (str): Text generated by model
            finish_reason (Union[Literal["length", "stop"], None]): The reason the
              response is being sent: "length", "stop" or `None`.
            prompt_token_count (Optional[int]): The number of tokens in the prompt,
              used to populate the "usage" field (not used when stream).
            completion_token_count (Optional[int]): The number of tokens in the
              response, used to populate the "usage" field (not used when stream).
            token_logprobs (Optional[List[float]]): The log probabilities per token,
              in token order.
            top_tokens (Optional[List[Dict[int, float]]]): List of dictionaries mapping
              tokens to logprobs for the top N tokens at each token position.
            tokens (Optional[List[int]]): List of tokens to return with logprobs structure
            tool_calls (Optional[List[str]]): List of tool calls.

        Returns:
            dict: A dictionary containing the response, in the same format as
              OpenAI's API.
        """
        token_logprobs = token_logprobs or []
        top_logprobs = top_tokens or []
        tool_calls = tool_calls or []

        def parse_function(tool_text):
            tool_call = json.loads(tool_text.strip())
            return {
                "function": {
                    "name": tool_call.get("name", None),
                    "arguments": json.dumps(tool_call.get("arguments", "")),
                },
                "type": "function",
                "id": None,
            }

        # Static response
        response = {
            "id": self.request_id,
            "system_fingerprint": self.system_fingerprint,
            "object": self.object_type,
            "model": self.requested_model,
            "created": self.created,
            "choices": [
                {
                    "index": 0,
                    "finish_reason": finish_reason,
                },
            ],
        }

        if token_logprobs or top_logprobs or tokens:
            response["choices"][0]["logprobs"] = {
                "token_logprobs": token_logprobs,
                "top_logprobs": top_logprobs,
                "tokens": tokens,
            }

        if not self.stream:
            if not (
                isinstance(prompt_token_count, int)
                and isinstance(completion_token_count, int)
            ):
                raise ValueError(
                    "Response type is complete, but token counts not provided"
                )

            response["usage"] = {
                "prompt_tokens": prompt_token_count,
                "completion_tokens": completion_token_count,
                "total_tokens": prompt_token_count + completion_token_count,
            }

        choice = response["choices"][0]

        # Add dynamic response
        if self.object_type.startswith("chat.completion"):
            key_name = "delta" if self.stream else "message"
            choice[key_name] = {
                "role": "assistant",
                "content": text,
                "tool_calls": [parse_function(tool_text) for tool_text in tool_calls],
            }
        elif self.object_type == "text_completion":
            choice.update(text=text)
        else:
            raise ValueError(f"Unsupported response type: {self.object_type}")

        return response

    def reset_prompt_cache(self, prompt):
        """Resets the prompt cache and associated state.

        Args:
            prompt (List[int]): The tokenized new prompt which will populate the
                reset cache.
        """
        logging.debug(f"*** Resetting cache. ***")
        self.prompt_cache.model_key = self.model_provider.model_key
        self.prompt_cache.cache = make_prompt_cache(self.model_provider.model)
        if self.model_provider.draft_model is not None:
            self.prompt_cache.cache += make_prompt_cache(
                self.model_provider.draft_model
            )
        self.prompt_cache.tokens = list(prompt)  # Cache the new prompt fully

    def get_prompt_cache(self, prompt):
        """
        Determines the portion of the prompt that needs processing by comparing
        it to the cached prompt and attempting to reuse the common prefix.

        This function updates the internal prompt cache state (tokens and model cache)
        based on the comparison. If a common prefix exists, it attempts to trim
        the model cache (if supported) to match the common prefix length, avoiding
        recomputation.

        Args:
            prompt (List[int]): The tokenized new prompt.

        Returns:
            List[int]: The suffix of the prompt that actually needs to be processed
                       by the model. This will be the full prompt if the cache is
                       reset or cannot be effectively used.
        """
        cache_len = len(self.prompt_cache.tokens)
        prompt_len = len(prompt)
        com_prefix_len = common_prefix_len(self.prompt_cache.tokens, prompt)

        # Leave at least one token in the prompt
        com_prefix_len = min(com_prefix_len, len(prompt) - 1)

        # Condition 1: Model changed or no common prefix at all. Reset cache.
        if (
            self.prompt_cache.model_key != self.model_provider.model_key
            or com_prefix_len == 0
        ):
            self.reset_prompt_cache(prompt)

        # Condition 2: Common prefix exists and matches cache length. Process suffix.
        elif com_prefix_len == cache_len:
            logging.debug(
                f"*** Cache is prefix of prompt (cache_len: {cache_len}, prompt_len: {prompt_len}). Processing suffix. ***"
            )
            prompt = prompt[com_prefix_len:]
            self.prompt_cache.tokens.extend(prompt)

        # Condition 3: Common prefix exists but is shorter than cache length. Attempt trim.
        elif com_prefix_len < cache_len:
            logging.debug(
                f"*** Common prefix ({com_prefix_len}) shorter than cache ({cache_len}). Attempting trim. ***"
            )

            if can_trim_prompt_cache(self.prompt_cache.cache):
                num_to_trim = cache_len - com_prefix_len
                logging.debug(f"    Trimming {num_to_trim} tokens from cache.")
                trim_prompt_cache(self.prompt_cache.cache, num_to_trim)
                self.prompt_cache.tokens = self.prompt_cache.tokens[:com_prefix_len]
                prompt = prompt[com_prefix_len:]
                self.prompt_cache.tokens.extend(prompt)
            else:
                logging.debug(f"    Cache cannot be trimmed. Resetting cache.")
                self.reset_prompt_cache(prompt)

        # This case should logically not be reached if com_prefix_len <= cache_len
        else:
            logging.error(
                f"Unexpected cache state: com_prefix_len ({com_prefix_len}) > cache_len ({cache_len}). Resetting cache."
            )
            self.reset_prompt_cache(prompt)

        logging.debug(f"Returning {len(prompt)} tokens for processing.")
        return prompt

    def handle_completion(
        self,
        prompt: List[int],
        stop_id_sequences: List[List[int]],
    ):
        """
        Generate a response to a prompt and send it to the client in a single batch.

        Args:
            prompt (List[int]): The tokenized prompt.
            stop_id_sequences (List[List[int]]): A list of stop words passed
                to the stopping_criteria function
        """
        tokens = []
        finish_reason = "length"
        stop_sequence_suffix = None
        if self.stream:
            self.end_headers()
            logging.debug(f"Starting stream:")
        else:
            logging.debug(f"Starting completion:")
        token_logprobs = []
        top_tokens = []

        prompt = self.get_prompt_cache(prompt)

        text = ""
        tic = time.perf_counter()
        sampler = make_sampler(
            self.temperature,
            top_p=self.top_p,
            top_k=self.top_k,
            min_p=self.min_p,
            xtc_probability=self.xtc_probability,
            xtc_threshold=self.xtc_threshold,
            xtc_special_tokens=[
                self.tokenizer.eos_token_id,
                self.tokenizer.encode("\n"),
            ],
        )
        logits_processors = make_logits_processors(
            self.logit_bias,
            self.repetition_penalty,
            self.repetition_context_size,
        )

        tool_calls = []
        tool_text = ""
        in_tool_call = False
        segment = ""

        # --- ▼▼▼ ここから追加 ▼▼▼ ---
        # レスポンス形式を整形するための状態管理変数を初期化
        gemma_buffer = ""
        # 状態: INITIAL -> BUFFERING -> AWAITING_FINAL -> STREAMING
        gemma_state = "INITIAL"
        # --- ▲▲▲ ここまで追加 ▲▲▲ ---

        # Create keepalive callback to send SSE comments during long prompt processing
        def keepalive_callback(processed_tokens, total_tokens):
            logging.info(
                f"Prompt processing progress: {processed_tokens}/{total_tokens}"
            )
            if self.stream:
                try:
                    # Send SSE comment for keepalive - invisible to clients but keeps connection alive
                    self.wfile.write(
                        f": keepalive {processed_tokens}/{total_tokens}\n\n".encode()
                    )
                    self.wfile.flush()
                except (BrokenPipeError, ConnectionResetError, OSError):
                    # Client disconnected, ignore
                    pass

        for gen_response in stream_generate(
            model=self.model,
            tokenizer=self.tokenizer,
            prompt=prompt,
            max_tokens=self.max_tokens,
            sampler=sampler,
            logits_processors=logits_processors,
            prompt_cache=self.prompt_cache.cache,
            draft_model=self.model_provider.draft_model,
            num_draft_tokens=self.num_draft_tokens,
            prompt_progress_callback=keepalive_callback,
        ):
            logging.debug(gen_response.text)

            if (
                self.tokenizer.has_tool_calling
                and gen_response.text == self.tokenizer.tool_call_start
            ):
                in_tool_call = True
            elif in_tool_call:
                if gen_response.text == self.tokenizer.tool_call_end:
                    tool_calls.append(tool_text)
                    tool_text = ""
                    in_tool_call = False
                else:
                    tool_text += gen_response.text
            else:
                # --- ▼▼▼ ここから変更 ▼▼▼ ---
                # ストリーミングが有効、かつツールコール中でない場合に整形処理を実行
                if self.stream and not in_tool_call:
                    gemma_buffer += gen_response.text
                    segment_to_send = ""

                    # 状態: 初期状態。レスポンス形式を判定する
                    if gemma_state == "INITIAL":
                        if "<|channel|>" in gemma_buffer:
                            gemma_state = "BUFFERING"
                        elif len(gemma_buffer) > 11:  # len("<|channel|>")
                            gemma_state = "STREAMING"

                    # 状態: バッファリング中。analysisシーケンスを探す
                    if gemma_state == "BUFFERING":
                        analysis_seq = "<|channel|>analysis<|message|>"
                        if analysis_seq in gemma_buffer:
                            segment_to_send = gemma_buffer.replace(analysis_seq, f"<details>{analysis_seq}")
                            gemma_buffer = ""
                            gemma_state = "AWAITING_FINAL"

                    # 状態: finalシーケンスを待機中
                    if gemma_state == "AWAITING_FINAL":
                        final_seq = "<|channel|>final<|message|>"
                        if final_seq in gemma_buffer:
                            segment_to_send += gemma_buffer.replace(final_seq, f"{final_seq}</details>  ")
                            gemma_buffer = ""
                            gemma_state = "STREAMING"
                        else:
                            # シーケンスがトークン境界で分割される可能性を考慮し、
                            # バッファの末尾(シーケンス長-1)文字を残して送信
                            safe_flush_len = len(gemma_buffer) - (len(final_seq) - 1)
                            if safe_flush_len > 0:
                                segment_to_send += gemma_buffer[:safe_flush_len]
                                gemma_buffer = gemma_buffer[safe_flush_len:]

                    # 状態: 通常ストリーミング。バッファをすべて送信
                    if gemma_state == "STREAMING":
                        segment_to_send += gemma_buffer
                        gemma_buffer = ""

                    # 処理したテキストを送信セグメントと全体テキストに追加
                    segment += segment_to_send
                    text += segment_to_send
                else:
                    # ストリーミングでない場合やツールコール中は元の動作
                    text += gen_response.text
                    segment += gen_response.text
                # --- ▲▲▲ ここまで変更 ▲▲▲ ---
            token = gen_response.token
            logprobs = gen_response.logprobs
            tokens.append(token)
            self.prompt_cache.tokens.append(token)

            if self.logprobs > 0:
                sorted_indices = mx.argpartition(-logprobs, kth=self.logprobs - 1)
                top_indices = sorted_indices[: self.logprobs]
                top_logprobs = logprobs[top_indices]
                top_token_info = zip(top_indices.tolist(), top_logprobs.tolist())
                top_tokens.append(tuple(top_token_info))

            token_logprobs.append(logprobs[token].item())

            stop_condition = stopping_criteria(
                tokens, stop_id_sequences, self.tokenizer.eos_token_id
            )
            if stop_condition.stop_met:
                finish_reason = "stop"
                if stop_condition.trim_length:
                    stop_sequence_suffix = self.tokenizer.decode(
                        tokens[-stop_condition.trim_length :]
                    )
                    text = text[: -len(stop_sequence_suffix)]
                segment = ""
                break

            if self.stream and not in_tool_call:
                # If the end of tokens overlaps with a stop sequence, generate new
                # tokens until we know if the stop sequence is hit or not
                if any(
                    (
                        sequence_overlap(tokens, sequence)
                        for sequence in stop_id_sequences
                    )
                ):
                    continue
                elif segment or tool_calls:
                    response = self.generate_response(
                        segment, None, tool_calls=tool_calls
                    )
                    self.wfile.write(f"data: {json.dumps(response)}\n\n".encode())
                    self.wfile.flush()
                    segment = ""
                    tool_calls = []

        # --- ▼▼▼ ここから追加 ▼▼▼ ---
        # ループ終了後、バッファにデータが残っていれば最後のセグメントに追加
        if gemma_buffer:
            segment += gemma_buffer
            gemma_buffer = ""
        # --- ▲▲▲ ここまで追加 ▲▲▲ ---

        if gen_response.finish_reason is not None:
            finish_reason = gen_response.finish_reason

        logging.debug(f"Prompt: {gen_response.prompt_tps:.3f} tokens-per-sec")
        logging.debug(f"Generation: {gen_response.generation_tps:.3f} tokens-per-sec")
        logging.debug(f"Peak memory: {gen_response.peak_memory:.3f} GB")

        if self.stream:
            response = self.generate_response(
                segment, finish_reason, tool_calls=tool_calls
            )
            self.wfile.write(f"data: {json.dumps(response)}\n\n".encode())
            self.wfile.flush()
            if self.stream_options is not None and self.stream_options["include_usage"]:
                original_prompt_length = (
                    len(self.prompt_cache.tokens) - len(tokens) + len(prompt)
                )
                response = self.completion_usage_response(
                    original_prompt_length, len(tokens)
                )
                self.wfile.write(f"data: {json.dumps(response)}\n\n".encode())
                self.wfile.flush()
            self.wfile.write("data: [DONE]\n\n".encode())
            self.wfile.flush()
        else:
            response = self.generate_response(
                text,
                finish_reason,
                len(prompt),
                len(tokens),
                token_logprobs=token_logprobs,
                top_tokens=top_tokens,
                tokens=tokens,
                tool_calls=tool_calls,
            )
            response_json = json.dumps(response).encode()
            indent = "\t"  # Backslashes can't be inside of f-strings
            logging.debug(f"Outgoing Response: {json.dumps(response, indent=indent)}")

            # Send an additional Content-Length header when it is known
            self.send_header("Content-Length", str(len(response_json)))
            self.end_headers()
            self.wfile.write(response_json)
            self.wfile.flush()

    def completion_usage_response(
        self,
        prompt_token_count: Optional[int] = None,
        completion_token_count: Optional[int] = None,
    ):
        response = {
            "id": self.request_id,
            "system_fingerprint": self.system_fingerprint,
            "object": "chat.completion",
            "model": self.requested_model,
            "created": self.created,
            "choices": [],
            "usage": {
                "prompt_tokens": prompt_token_count,
                "completion_tokens": completion_token_count,
                "total_tokens": prompt_token_count + completion_token_count,
            },
        }
        return response

    def handle_chat_completions(self) -> List[int]:
        """
        Handle a chat completion request.

        Returns:
            mx.array: A mx.array of the tokenized prompt from the request body
        """
        body = self.body
        assert "messages" in body, "Request did not contain messages"

        # Determine response type
        self.request_id = f"chatcmpl-{uuid.uuid4()}"
        self.object_type = "chat.completion.chunk" if self.stream else "chat.completion"

        # --- 1. システムプロンプトから `reasoning_effort` を最初に抽出 ---
        reasoning_level_from_prompt = None
        for message in body["messages"]:
            if message.get("role") == "system":
                matches = re.findall(r"Reasoning:[\s\n]*(\w+)", message.get("content", ""), re.IGNORECASE)
                if matches:
                    # 最後にマッチした値を採用
                    reasoning_level_from_prompt = matches[-1]
                    break
        
        # 2. 抽出した値を `self.reasoning_effort` に設定し、デフォルト値を上書き
        if reasoning_level_from_prompt is not None:
            self.reasoning_effort = reasoning_level_from_prompt
        
        # --- ここから他のメッセージ処理ロジック ---
        if self.tokenizer.chat_template:
            messages = body["messages"]

            # `assistant`メッセージを処理するカスタムロジック
            # この処理はシステムメッセージに影響しないため、ここに配置します
            for message in messages:
                if message["role"] == "assistant":
                    content = message.get("content", "")
                    if "<|channel|>analysis<|message|>" in content and "<|channel|>final<|message|>" in content:
                        try:
                            analysis_start_tag = "<|channel|>analysis<|message|>"
                            analysis_end_tag = "<|end|>"
                            final_start_tag = "<|channel|>final<|message|>"
                            analysis_start = content.find(analysis_start_tag) + len(analysis_start_tag)
                            analysis_end = content.find(analysis_end_tag)
                            final_start = content.find(final_start_tag) + len(final_start_tag)
                            analysis = content[analysis_start:analysis_end].strip()
                            final = content[final_start:].strip()
                            message["content"] = final
                            message["thinking"] = analysis
                        except Exception as e:
                            logging.error(f"Failed to parse assistant message with analysis/final tags: {e}")
                            message["thinking"] = ""
            
            # メッセージコンテンツが確定した後に呼び出す
            process_message_content(messages)

            # 3. `chat_template_args`を作成し、`reasoning_effort`を渡す
            chat_template_args = self.model_provider.cli_args.chat_template_args.copy()
            if self.reasoning_effort is not None:
                chat_template_args["reasoning_effort"] = self.reasoning_effort

            prompt = self.tokenizer.apply_chat_template(
                messages,
                body.get("tools") or None,
                add_generation_prompt=True,
                **chat_template_args,
            )
        else:
            # 非チャットテンプレートモデルの既存ロジック
            prompt = convert_chat(body["messages"], body.get("role_mapping"))
            prompt = self.tokenizer.encode(prompt)

        # --- ここから追加 --- 
        # ログに Reasoning の設定を書き出す
        logging.info(
            #f"Request {self.request_id} for model '{self.model_provider.model_name}' "
            f"Reasoning Effort '{self.reasoning_effort}'"
        )
        # --- ここまで ---

        return prompt

    def handle_text_completions(self) -> List[int]:
        """
        Handle a text completion request.

        Returns:
            mx.array: A mx.array of the tokenized prompt from the request body
        """
        # Determine response type
        self.request_id = f"cmpl-{uuid.uuid4()}"
        self.object_type = "text_completion"
        assert "prompt" in self.body, "Request did not contain a prompt"
        return self.tokenizer.encode(self.body["prompt"])

    def do_GET(self):
        """
        Respond to a GET request from a client.
        """
        if self.path.startswith("/v1/models"):
            self.handle_models_request()
        elif self.path == "/health":
            self.handle_health_check()
        else:
            self._set_completion_headers(404)
            self.end_headers()
            self.wfile.write(b"Not Found")

    def handle_health_check(self):
        """
        Handle a GET request for the /health endpoint.
        """
        self._set_completion_headers(200)
        self.end_headers()

        self.wfile.write('{"status": "ok"}'.encode())
        self.wfile.flush()

    def handle_models_request(self):
        """
        Handle a GET request for the /v1/models endpoint.
        """
        self._set_completion_headers(200)
        self.end_headers()

        files = ["config.json", "model.safetensors.index.json", "tokenizer_config.json"]

        parts = self.path.split("/")
        filter_repo_id = None
        if len(parts) > 3:
            filter_repo_id = "/".join(parts[3:])

        def probably_mlx_lm(repo):
            if repo.repo_type != "model":
                return False
            if "main" not in repo.refs:
                return False
            if filter_repo_id is not None and repo.repo_id != filter_repo_id:
                return False
            file_names = {f.file_path.name for f in repo.refs["main"].files}
            return all(f in file_names for f in files)

        # Scan the cache directory for downloaded mlx models
        hf_cache_info = scan_cache_dir()
        downloaded_models = [
            repo for repo in hf_cache_info.repos if probably_mlx_lm(repo)
        ]

        # Create a list of available models
        models = [
            {
                "id": repo.repo_id,
                "object": "model",
                "created": self.created,
            }
            for repo in downloaded_models
        ]

        response = {"object": "list", "data": models}

        response_json = json.dumps(response).encode()
        self.wfile.write(response_json)
        self.wfile.flush()


def run(
    host: str,
    port: int,
    model_provider: ModelProvider,
    server_class=HTTPServer,
    handler_class=APIHandler,
):
    server_address = (host, port)
    prompt_cache = PromptCache()
    infos = socket.getaddrinfo(
        *server_address, type=socket.SOCK_STREAM, flags=socket.AI_PASSIVE
    )
    server_class.address_family, _, _, _, server_address = next(iter(infos))
    httpd = server_class(
        server_address,
        lambda *args, **kwargs: handler_class(
            model_provider,
            prompt_cache=prompt_cache,
            system_fingerprint=get_system_fingerprint(),
            *args,
            **kwargs,
        ),
    )
    warnings.warn(
        "mlx_lm.server is not recommended for production as "
        "it only implements basic security checks."
    )
    logging.info(f"Starting httpd at {host} on port {port}...")
    httpd.serve_forever()


def main():
    parser = argparse.ArgumentParser(description="MLX Http Server.")
    parser.add_argument(
        "--model",
        type=str,
        help="The path to the MLX model weights, tokenizer, and config",
    )
    parser.add_argument(
        "--adapter-path",
        type=str,
        help="Optional path for the trained adapter weights and config.",
    )
    parser.add_argument(
        "--host",
        type=str,
        default="127.0.0.1",
        help="Host for the HTTP server (default: 127.0.0.1)",
    )
    parser.add_argument(
        "--port",
        type=int,
        default=8080,
        help="Port for the HTTP server (default: 8080)",
    )
    parser.add_argument(
        "--draft-model",
        type=str,
        help="A model to be used for speculative decoding.",
        default=None,
    )
    parser.add_argument(
        "--num-draft-tokens",
        type=int,
        help="Number of tokens to draft when using speculative decoding.",
        default=3,
    )
    parser.add_argument(
        "--trust-remote-code",
        action="store_true",
        help="Enable trusting remote code for tokenizer",
    )
    parser.add_argument(
        "--log-level",
        type=str,
        default="INFO",
        choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
        help="Set the logging level (default: INFO)",
    )
    parser.add_argument(
        "--chat-template",
        type=str,
        default="",
        help="Specify a chat template for the tokenizer",
        required=False,
    )
    parser.add_argument(
        "--use-default-chat-template",
        action="store_true",
        help="Use the default chat template",
    )
    parser.add_argument(
        "--temp",
        type=float,
        default=0.0,
        help="Default sampling temperature (default: 0.0)",
    )
    parser.add_argument(
        "--top-p",
        type=float,
        default=1.0,
        help="Default nucleus sampling top-p (default: 1.0)",
    )
    parser.add_argument(
        "--top-k",
        type=int,
        default=0,
        help="Default top-k sampling (default: 0, disables top-k)",
    )
    parser.add_argument(
        "--min-p",
        type=float,
        default=0.0,
        help="Default min-p sampling (default: 0.0, disables min-p)",
    )
    parser.add_argument(
        "--max-tokens",
        type=int,
        default=512,
        help="Default maximum number of tokens to generate (default: 512)",
    )
    parser.add_argument(
        "--chat-template-args",
        type=json.loads,
        help="""A JSON formatted string of arguments for the tokenizer's apply_chat_template, e.g. '{"enable_thinking":false}'""",
        default="{}",
    )
    args = parser.parse_args()

    logging.basicConfig(
        level=getattr(logging, args.log_level.upper(), None),
        format="%(asctime)s - %(levelname)s - %(message)s",
    )
    run(args.host, args.port, ModelProvider(args))


if __name__ == "__main__":
    print(
        "Calling `python -m mlx_lm.server...` directly is deprecated."
        " Use `mlx_lm.server...` or `python -m mlx_lm server ...` instead."
    )
    main()

改造の元にしたバージョンはこちら:

  • MLX: 0.29.0
  • MLX-LM: 0.27.0

実際に改造済みコードを使えるようにする手順は以前の記事にそれなりに細かく書いたので、そちらを参照してください。基本的には単純にコードをコピーして MLX-LM のserver.pyを上書きするだけです。

Dify や Open WebUI からの使い方

システムプロンプトにReasoning: High (選べるオプション: High/Middle/Low) 等と入れてあげれば指定に従って推論してくれます。指定しなければMiddleで、大文字小文字は区別しません。

Dify では、割といい感じで回答してくれる他のパラメータの設定と併せてこんな感じ ↓ で使ってください。

MLX バックエンドなので Max Tokens は最大でも当分は大丈夫

Open WebUI だと特に繰り返しの文章が頻発するので、パラメータを結構いじっています。いくつかはデフォルト値と同じで、いくつかは OpenAI の推奨設定です。

効果

確実に推論 (reasoning/thinking) に使われるトークン数は High が多く、Low だとほとんど使いません。つまり、High にしたほうが回答の出力が始まるまでじっくり考えています。

Token/sec (トークン数/秒 = 出力の速さ) ということで言えばどれもだいたい 80 T/S を超えてくるので大きな違いはありません。質問を投げてから回答が終わるまでの時間は当然 Low が一番速くなりますが、非 reasoning/thinking 系モデルのようにすぐに回答が始まるわけではなく、質問の難度によってある程度の推論は行われます (Qwen3 32B 等の/no_thinkとは違い、あくまでもLow)。

回答の質はどうかというと、試した感じでは確かに High にした場合が一番良い回答を得られています。Middle ではいくらラウンドを繰り返して説得しても「10cm」と回答してくれなかった下の質問ですが、High にすると毎回では無いものの「10cm」と回答することがあるようになりました (Temperature や Top_P の値が大きいので、回答内容には結構幅がある)。

厚さ 10cm の綿飴の上に厚さ 10cm のレンガを載せたときの合計の高さを教えてください。物理世界で最も起こり得る可能性を重要視してください

ただ、原因は不明ながら High と Low だと日本語が若干怪しくなるときがあります。

なぜそこまでして gpt-oss を使うのか

リリース当初こそ多くの gpt-oss 関連記事が見られましたが、性能が見限られたのか最近ほとんど使っているという情報を目にしなくなりました。ググってもボクのブログ記事が上位に来てしまう状況です。実際自分で使っていても、上に書いた質問にはいくら説得しても答えられないし、頻繁に同じ事を吐き続けるループに陥るし、という感じで、速いだけのじゃじゃ馬感がありました。

それでも LLM の性能比較によく見に行っているサイト ( ↓ ) では gpt-oss-20B (high) が 20-30 クラスの LLM では悪くない位置にいます。なので、諦めるにしても性能面でのポテンシャルをもう少し自分で確かめたかったというのが大きいですね。

Comparison of Models: Intelligence, Performance & Price Analysis

あとは、検閲の入っていないオープンで性能の良いモデルを使いたいというところでしょうか。ま、ここはみなさん色々と意見がありそうですが。

とまれ、これまでの改造で Mac に最適化された gpt-oss のポテンシャルを試す下地はほぼできたんじゃないでしょうか。32 GB ユニファイドメモリの Mac で動くローカル LLM はどれがちょうどいいのか、色々と使いながら試していこうと思います。

あ、今回の改造も最終的には無料版の Gemini (2.5 Flash) に手伝ってもらいました。無料版の ChatGPT はあんまりよくないですね。チャットをまたいだ記憶とか別にいらんのよな。

Image by Stable Diffusion (Mochi Diffusion)

「手綱をつけられたじゃじゃ馬」のイメージでお願いしてみました。OpenAI のロゴは白いイメージだったのですけど、調べたら緑や紫のもあるんですね。白い馬にしましたけど。

Date:
2025年9月15日 21:32:03

Model:
realisticVision-v51VAE_original_768x512_cn

Size:
768 x 512

Include in Image:
a white wild horse with reins

Exclude from Image:

Seed:
38536801

Steps:
20

Guidance Scale:
20.0

Scheduler:
DPM-Solver++

ML Compute Unit:
CPU & GPU

Open WebUI を Mac のログインで自動起動する方法 (pip と launchctl で。ログローテーションも添えて)

【2026/01/02 追記】macOS Tahoe 26 ではセキュリティが厳しくなったようで、~/Documents (書類) ディレクトリ内にあるスクリプトの自動起動がこれまで通りの方法ではできなくなりました (PermissionError: [Errno 1] Operation not permitted 等が表示され、plist が起動しない)。本記事公開時に使用していた~/Documents内の仮想環境を外に出したところ動くようになったので、パスを更新しています。【追記ここまで】

最近 Open WebUI をメインの LLM フロントエンドにしようとしているのですが、起動が遅いのが残念ポイントの一つ。とりあえずlaunchctlで macOS にログインしたときに自動で起動するようサービス化したら「起動待ち」ストレスがなくなりました。

Open WebUI に限らず CLI で動くものは全て同様の方法でサービスとして追加できるはずです。簡単かと思ったら微妙にハマったのでやりかたを共有します。ログローテーションの自動化も行っています。

【2025/12/31 追加】ログローテーションの設定ファイルの書式に誤りがあり、古いログが削除されない不具合がありました。内容を修正しています。

環境の準備

pipでインストールした Open WebUI (または何らかの CLI ツールなど) がある方は次に進んでください。

こらから構築する方は、何らかの Python 仮想環境を作り、pip install open-webuiでインストールしてください (~/Documents配下だとうまく動作しないため、本記事では~/Developerディレクトリを作り、その中に仮想環境を作っています)。

何でもいいから仮想環境の作り方をもっと詳しく教えてくれ、という方はこちら (↓) の記事を参考に、OpenWebUIフォルダにpipenv環境を作ると、以降の作業がちょっと楽かもしれません (↓が英語ページが開いたらこちらの日本語ページをどうぞ)。

Docker で Open WebUI を動かしている場合にどうするのかは知りませんごめんなさい。

Open WebUI の簡単な実行方法などはこちら (↓が英語ページが開いたらこちらの日本語ページをどうぞ):

設定ファイル (plist)

まず、本記事では自分がログインしたときにのみ実行するようにするので、ファイルの保存先は~/Library/LaunchAgents/になります。

以下に設定ファイルの例を貼っておきますのでご自身の環境に合わせて書き換え、com.openwebui.plistみたいな名前で保存してください。

変更が必要なところはハイライトしています。主にユーザ名 (handsomeのところ) やコマンドまでのパス、変更していればポート番号等ですかね。その他はコメントを見てもらえれば分かるかと思います。

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
    <!-- ラベル(サービス名) -->
    <key>Label</key>
    <string>com.openwebui</string>

    <!-- 実行するコマンド -->
    <key>ProgramArguments</key>
    <array>
        <!-- ここは open‑webui があるパスに合わせて変更 -->
        <string>/Users/handsome/Developer/OpenWebUI/.venv/bin/open-webui</string>
        <!-- 必要に応じてオプションを追加 -->
        <string>serve</string>
        <string>--host</string>
        <string>0.0.0.0</string>
        <string>--port</string>
        <string>8080</string>
    </array>

    <!-- 実行するディレクトリ -->
    <key>WorkingDirectory</key>
    <string>/Users/handsome/Developer/OpenWebUI</string>

    <!-- ログイン時に自動起動 -->
    <key>RunAtLoad</key>
    <true/>

    <!-- 失敗したら自動再起動 (正しく起動する事が確認できてから追加がよいでしょう) -->
    <key>KeepAlive</key>
    <true/>

    <!-- 標準出力・エラーをログファイルに書き込む -->
    <key>StandardOutPath</key>
    <string>/Users/handsome/Library/Logs/openwebui/openwebui.log</string>
    <key>StandardErrorPath</key>
    <string>/Users/handsome/Library/Logs/openwebui/openwebui.err</string>

</dict>
</plist>
com.openwebui.plist

ちなみにここでボクがハマったポイントは、全てフルパス (absolute path) で書く必要があったというところと、WorkingDirectoryが必要だった、というところです。

ログローテーションの準備

ログの書き出し・ローテーションは不要、という方は、上の設定ファイルの 34-38行を削除し、このセクションを飛ばして大丈夫です。

ログローテーション (たまったログを自動でアーカイブしたり削除したりという処理) は、macOS 標準のnewsyslogの力を借ります。ここでの作業は一部管理者権限が必要なのでご注意ください。

まずは、ログの保存先ディレクトリを作りましょう。

mkdir -p ~/Library/Logs/openwebui
Zsh

次にローテーションの定義ファイルを書きます。管理者権限でお好きなエディタで書いてください。viを使う場合は以下の様な感じす。最後の行だけあれば OK ですが、少なくとも上記で作ったフォルダのパスと、その後の owner (handsomeをご自身のユーザ名に) だけは書き換えてください。

sudo vi /etc/newsyslog.d/com.openwebui.conf
Zsh

内容は以下の感じで:

# logfilename        [owner:group] mode count size when flags [/pid_file]        [sig_num]
/Users/handsome/Library/Logs/openwebui/openwebui.log  handsome:staff  644 7   *   $D03    JG
/Users/handsome/Library/Logs/openwebui/openwebui.err  handsome:staff  644 7   *   $D03    JG
com.openwebui.conf

【2025/12/31 追加】過去の記事ではログローテーションの設定ファイルの書式に誤りがあり、古いログが削除されない不具合がありました。ログファイルの指定を修正しました。

※こちらの情報を大いに参考にさせていただきました: ログローテート / newsyslog (メモとかメモのようなものとか (By ルーキーの中のひと) 様、ありがとうございます!

設定している内容としては、以下となります:

  • /Users/handsome/Library/Logs/openwebui/openwebui.logopenwebui.errで対象のファイルを指定 (2025/12/31 追記: ファイル名をopenwebui.*とすると、古いファイルが削除されなため注意ください)
  • ログファイルのオーナーとグループを指定 (ユーザ名:staff) ← ここを間違うとエラーコード 78 で起動に失敗します!
  • パーミッションは644
  • 7世代残す
  • サイズは考慮しない (*)
  • 毎日 ($D) 03:00 時 AM (03) に実行
  • bzip2 で圧縮

(上に太字で書た通り、オプションとなっていますがowner:groupを書かなかったり間違えたりするとパーミッションの問題が発生しサービスが起動しません。ボクはハマりました)

構文のチェックと次回のローテーションの内容などは以下で確認できます:

sudo newsyslog -nv
Zsh

手動で実行し、指定した世代より古いファイルが削除されるか確認するには以下のコマンドを世代数以上実行してみてください。

sudo newsyslog -v -F -f /etc/newsyslog.d/com.openwebui.conf
Zsh

ログをリアルタイムで表示するには当然、tail -fが使えます:

tail -f ~/Library/Logs/openwebui/openwebui.log
Zsh

サービスの追加、確認、停止

先ほど書いた設定をユーザサービスとして追加 (ロード) します。永続的に設定する-wを忘れずに。

launchctl load -w ~/Library/LaunchAgents/com.openwebui.plist
Zsh

設定にKeepAliveを付けてあるのでロードした瞬間 Open WebUI が起動を始めるはずですが、試しにログアウト・ログインして確認しても良いでしょう。

サービスが動いているかどうかは以下のコマンドで確認できます:

launchctl list | grep openwebui
Zsh

以下は実行例です。一つ目がプロセス ID または- (停止中)で、次がステータスコード、最後がラベル名になります。この例では無事動いている状況が分かりますが、-の後に0以外の数字があると何らかの理由でサービスが起動に失敗しているということですので、ステータス (終了) コードをググって解決してください。

% launchctl list | grep openwebui
5932 0 com.openwebui

Open WebUI をアップデート (pip install -U open-webui) するときなどは、サービスを一端停止 (アンロード) します:

launchctl unload ~/Library/LaunchAgents/com.openwebui.plist
Zsh

アップデート後は再度ロードするなりログアウト・ログインでサービスが起動します。

永続的に解除する場合は-wを付けてあげます:

launchctl unload -w ~/Library/LaunchAgents/com.openwebui.plist
Zsh

以上でございます!

【雑記】で、Open WebUI はどうか

今のところ、あんまし良くないなーという感想です。全部の機能を使ったわけじゃないのであれですけど。

中でも特に良くないのは、LLM からのレスポンスが長いとブラウザでの表示がどんどん遅くなり、LLM の生成が終わっているのにまだトロトロと表示を続け、最悪のケースではそのうち表示が全くされなくなっちゃうことがある、ってところですね。LLM 側 (MLX-LM で実行) は生成がとっくに終わってるので、ブラウザへの描画が遅いのかな、と思います (Safari が顕著、Chrome でも発生)。

あと、なぜか gpt-oss で顕著なのですが、同じ文章のリピートが頻発します。Dify だとほぼ問題無いので、Open WebUI で「デフォルト」となっている設定のどれかが原因になっているのか、ちょっとまだ分かっていません。

モデルのデフォルト設定をいじる画面に行くまでが遠いのもちょっと面倒です。チャット画面でデフォルト設定が反映されているかどうかも分かりづらいし。例えばモデルの設定で max_tokens に数値を入れていたとしても、チャットコントロールで「デフォルト」をクリックすると 128 になっていて混乱します。

また、gpt-oss は reasoning/thinking の頭とケツに決まったタグが入るのでそこを隠すことができるのですが (別記事に書きました)、Qwen3-30B-A3B Thinking なんかだと思考の頭にはタグがありません。Reasoning Tags の End Tag だけに </think> を入れると、なぜか推論の後の生成部分も思考として隠されてしまいます。当然開かないと中身は見れません。

Issue をあげれば直してもらえそうですが、まだあまり Open WebUI にハマりきっておらず、ちょっと二の足を踏んでいる感じです。

Image by Stable Diffusion (Mochi Diffusion)

家事代行サービスは女性にお願いしたい、と思うのは差別的な考えでしょうか。せめて批判を避けようと、エプロンがアンミラのようになっていないのを選択。

Date:
2025年9月13日 0:29:40

Model:
realisticVision-v51VAE_original_768x512_cn

Size:
768 x 512

Include in Image:
house-keeping service lady

Exclude from Image:

Seed:
2859808184

Steps:
23

Guidance Scale:
20.0

Scheduler:
DPM-Solver++

ML Compute Unit:
CPU & GPU

gpt-oss の思考部分を隠すまともな方法と魔改造 (MLX-LM)

普段あまり使わない Open WebUI をいじっていたら、普通に gpt-oss の思考部分を隠せるとわかりました。さらに、2ラウンド以降のチャットも問題無く動きますね。まだサポートされていないと思っていたなんてお恥ずかしい。いやまったく、おハズです。せっかくなのでやり方共有します。これ以上おハズな人が増えないように。

しかし同様の方法は Dify + MLX-LM では使えません (よね?)。なので、Dify 用には MLX-LM API サーバの魔改造で対応します。

(2026/02/02 追記) 魔改造はもう不要!多分どんな環境でも動きそうな gpt-oss 専用の汎用プロキシサーバを書きました ↓

Open WebUI で gpt-oss の思考部分を隠す正攻法

簡単な話なので、ささっと行きましょう。

(1) 右上のユーザアイコン → 管理者パネル → 設定 → モデル → gpt-oss のモデル名または右にある鉛筆アイコンをクリック

(2) 高度なパラメータの右の「表示」→ Reasoning Tags の「デフォルト」をクリックして「カスタム」にし、以下の内容を入力:

Start Tag: <|channel|>analysis<|message|>

End Tag: <|end|><|start|>assistant<|channel|>final<|message|>

(3) 一番下にスクロールして「保存して更新

以下のスクリーンショットの様になっていれば OK です。

本件に直接関係ないですが、スクリーンショットでは他に、MLX の強みでコンテキスト長を最大にし (max_tokens: 131072)、同じ言葉がリピートする問題を抑えるための設定 (frequency_penalty: 0.4 と presence_penalty: 0.1) をしています←最適解が見つからず、試行錯誤中です。システムプロンプトのReasoning: Highは効かないですね (できました!↓)。同様に「推論の努力」をhighにしてもlowにしても gpt-oss に効いているのかはよくわかりません。

(2025/09/15 追記) 新たなる魔改造を持って MLX-LM でReasoning: Highを効かせることに成功しました → gpt-oss の Reasoning レベル変更を MLX-LM でもできるようにする魔改造

Reasoning Tags をカスタムにして入力

思考中の様子:

下向き v をクリックすると思考の様子が展開されます。完了すると思考にかかった時間が表示されます:

こうすると思考部分は LLM に送られなくなるようで、2ラウンド以降のチャットも問題無くできます。Open WebUI すばらしい (イマサラ)!Dify にも欲しいな、この設定。

えー?ホントー?好きになっちゃうー

MLX-LM を API サーバとして Open WebUI から使う方法はこちらの記事にまとめています。

Dify から MLX-LM API 経由の gpt-oss は本当に使えないのか確認

Dify の OpenAI-API-compatible プラグインの version 0.0.20 でテスト済みです。もしかしたら新しいバージョンではそもそも gpt-oss に対応済みの可能性もあるため、更新履歴を見てみましょう。0.0.21 と 0.0.22 が出てました。

ふむふむ、なるほど

何が変更されたのかわかりませんが、ともあれ問題が発生しても古いバージョンがインストールできそうなので、最新の 0.0.22 にしてみます。しかしモデルの追加・利用のそれぞれで設定可能な項目は以前と変わりませんでした。

プレビューでチャットしてみると思考部分は丸見えで 2ラウンド以降のチャットはエラー発生。ここも以前と変わらず。

はい、というわけで、魔改造の出番ですね。へっへっへ

Dify から MLX-LM API 経由の gpt-oss を使う魔改造

※ 動作確認済みのバージョンはMLX-LM: 0.27.0です。

本ブログでは何度もやっている、mlx-lm の server.py の改造ですが、今回はけっこうやっちゃってます。オリジナルのスクリプトは必ずバックアップしてからすすめてください。

前回記事からの変更内容は、言ってしまえば簡単で、<|channel|>analysis<|message|>から<|channel|>final<|message|>までを<details></details>で囲っているだけです。

サーバ側で思考部分を受け取っても廃棄する改造を行った前回記事はこちら:

スクリプト全てを見るにはここをクリック ★ (1194行あります)
# Copyright © 2023-2024 Apple Inc.

import argparse
import json
import logging
import platform
import socket
import time
import uuid
import warnings
from dataclasses import dataclass, field
from http.server import BaseHTTPRequestHandler, HTTPServer
from pathlib import Path
from typing import (
    Any,
    Dict,
    List,
    Literal,
    NamedTuple,
    Optional,
    Sequence,
    Tuple,
    Union,
)

import mlx.core as mx
from huggingface_hub import scan_cache_dir

from ._version import __version__
from .generate import stream_generate
from .models.cache import can_trim_prompt_cache, make_prompt_cache, trim_prompt_cache
from .sample_utils import make_logits_processors, make_sampler
from .utils import common_prefix_len, load


def get_system_fingerprint():
    gpu_arch = mx.metal.device_info()["architecture"] if mx.metal.is_available() else ""
    return f"{__version__}-{mx.__version__}-{platform.platform()}-{gpu_arch}"


class StopCondition(NamedTuple):
    stop_met: bool
    trim_length: int


def stopping_criteria(
    tokens: List[int],
    stop_id_sequences: List[List[int]],
    eos_token_id: Union[int, None],
) -> StopCondition:
    """
    Determines whether the token generation should stop based on predefined
    conditions.

    Args:
        tokens (List[int]): The current sequence of generated tokens.
        stop_id_sequences (List[List[[int]]): A list of integer lists, each
          representing a sequence of token IDs. If the end of the `tokens`
          list matches any of these sequences, the generation should stop.
        eos_token_id (Union[int, None]): The token ID that represents the
          end-of-sequence. If the last token in `tokens` matches this, the
          generation should stop.

    Returns:
        StopCondition: A named tuple indicating whether the stop condition has
          been met (`stop_met`) and how many tokens should be trimmed from the
          end if it has (`trim_length`).
    """
    if tokens and tokens[-1] == eos_token_id:
        return StopCondition(stop_met=True, trim_length=0)

    for stop_ids in stop_id_sequences:
        if len(tokens) >= len(stop_ids):
            if tokens[-len(stop_ids) :] == stop_ids:
                return StopCondition(stop_met=True, trim_length=len(stop_ids))

    return StopCondition(stop_met=False, trim_length=0)


def sequence_overlap(s1: Sequence, s2: Sequence) -> bool:
    """
    Checks if a suffix of s1 has overlap with a prefix of s2

    Args:
        s1 (Sequence): The first sequence
        s2 (Sequence): The second sequence

    Returns:
        bool: If the two sequences have overlap
    """
    max_overlap = min(len(s1), len(s2))
    return any(s1[-i:] == s2[:i] for i in range(1, max_overlap + 1))


def convert_chat(messages: List[dict], role_mapping: Optional[dict] = None):
    default_role_mapping = {
        "system_prompt": (
            "A chat between a curious user and an artificial intelligence "
            "assistant. The assistant follows the given rules no matter what."
        ),
        "system": "ASSISTANT's RULE: ",
        "user": "USER: ",
        "assistant": "ASSISTANT: ",
        "stop": "\n",
    }
    role_mapping = role_mapping if role_mapping is not None else default_role_mapping

    prompt = ""
    for line in messages:
        role_prefix = role_mapping.get(line["role"], "")
        stop = role_mapping.get("stop", "")
        content = line.get("content", "")
        prompt += f"{role_prefix}{content}{stop}"

    prompt += role_mapping.get("assistant", "")
    return prompt.rstrip()


def process_message_content(messages):
    """
    Convert message content to a format suitable for `apply_chat_template`.

    The function operates on messages in place. It converts the 'content' field
    to a string instead of a list of text fragments.

    Args:
        message_list (list): A list of dictionaries, where each dictionary may
          have a 'content' key containing a list of dictionaries with 'type' and
          'text' keys.

    Raises:
        ValueError: If the 'content' type is not supported or if 'text' is missing.

    """
    for message in messages:
        content = message["content"]
        if isinstance(content, list):
            text_fragments = [
                fragment["text"] for fragment in content if fragment["type"] == "text"
            ]
            if len(text_fragments) != len(content):
                raise ValueError("Only 'text' content type is supported.")
            message["content"] = "".join(text_fragments)
        elif content is None:
            message["content"] = ""


@dataclass
class PromptCache:
    cache: List[Any] = field(default_factory=list)
    model_key: Tuple[str, Optional[str]] = ("", None, None)
    tokens: List[int] = field(default_factory=list)


class ModelProvider:
    def __init__(self, cli_args: argparse.Namespace):
        """Load models on demand and persist them across the whole process."""
        self.cli_args = cli_args
        self.model_key = None
        self.model = None
        self.tokenizer = None
        self.draft_model = None

        # Preload the default model if it is provided
        self.default_model_map = {}
        if self.cli_args.model is not None:
            self.default_model_map[self.cli_args.model] = "default_model"
            self.load(self.cli_args.model, draft_model_path="default_model")

    def _validate_model_path(self, model_path: str):
        model_path = Path(model_path)
        if model_path.exists() and not model_path.is_relative_to(Path.cwd()):
            raise RuntimeError(
                "Local models must be relative to the current working dir."
            )

    # Added in adapter_path to load dynamically
    def load(self, model_path, adapter_path=None, draft_model_path=None):
        model_path, adapter_path, draft_model_path = map(
            lambda s: s.lower() if s else None,
            (model_path, adapter_path, draft_model_path),
        )

        model_path = self.default_model_map.get(model_path, model_path)
        if self.model_key == (model_path, adapter_path, draft_model_path):
            return self.model, self.tokenizer

        # Remove the old model if it exists.
        self.model = None
        self.tokenizer = None
        self.model_key = None
        self.draft_model = None

        # Building tokenizer_config
        tokenizer_config = {
            "trust_remote_code": True if self.cli_args.trust_remote_code else None
        }
        if self.cli_args.chat_template:
            tokenizer_config["chat_template"] = self.cli_args.chat_template

        if model_path == "default_model":
            if self.cli_args.model is None:
                raise ValueError(
                    "A model path has to be given as a CLI "
                    "argument or in the HTTP request"
                )
            adapter_path = adapter_path or self.cli_args.adapter_path
            model, tokenizer = load(
                self.cli_args.model,
                adapter_path=adapter_path,
                tokenizer_config=tokenizer_config,
            )
        else:
            self._validate_model_path(model_path)
            model, tokenizer = load(
                model_path, adapter_path=adapter_path, tokenizer_config=tokenizer_config
            )

        if self.cli_args.use_default_chat_template:
            if tokenizer.chat_template is None:
                tokenizer.chat_template = tokenizer.default_chat_template

        self.model_key = (model_path, adapter_path, draft_model_path)
        self.model = model
        self.tokenizer = tokenizer

        def validate_draft_tokenizer(draft_tokenizer):
            # Check if tokenizers are compatible
            if draft_tokenizer.vocab_size != tokenizer.vocab_size:
                logging.warning(
                    "Draft model tokenizer does not match model tokenizer. "
                    "Speculative decoding may not work as expected."
                )

        # Load draft model if specified
        if (
            draft_model_path == "default_model"
            and self.cli_args.draft_model is not None
        ):
            self.draft_model, draft_tokenizer = load(self.cli_args.draft_model)
            validate_draft_tokenizer(draft_tokenizer)

        elif draft_model_path is not None and draft_model_path != "default_model":
            self._validate_model_path(draft_model_path)
            self.draft_model, draft_tokenizer = load(draft_model_path)
            validate_draft_tokenizer(draft_tokenizer)
        return self.model, self.tokenizer


class APIHandler(BaseHTTPRequestHandler):
    def __init__(
        self,
        model_provider: ModelProvider,
        *args,
        prompt_cache: Optional[PromptCache] = None,
        system_fingerprint: Optional[str] = None,
        **kwargs,
    ):
        """
        Create static request specific metadata
        """
        self.created = int(time.time())
        self.model_provider = model_provider
        self.prompt_cache = prompt_cache or PromptCache()
        self.system_fingerprint = system_fingerprint or get_system_fingerprint()
        super().__init__(*args, **kwargs)

    def _set_cors_headers(self):
        self.send_header("Access-Control-Allow-Origin", "*")
        self.send_header("Access-Control-Allow-Methods", "*")
        self.send_header("Access-Control-Allow-Headers", "*")

    def _set_completion_headers(self, status_code: int = 200):
        self.send_response(status_code)
        self.send_header("Content-type", "application/json")
        self._set_cors_headers()

    def _set_stream_headers(self, status_code: int = 200):
        self.send_response(status_code)
        self.send_header("Content-type", "text/event-stream")
        self.send_header("Cache-Control", "no-cache")
        self._set_cors_headers()

    def do_OPTIONS(self):
        self._set_completion_headers(204)
        self.end_headers()

    def do_POST(self):
        """
        Respond to a POST request from a client.
        """
        endpoints = {
            "/v1/completions": self.handle_text_completions,
            "/v1/chat/completions": self.handle_chat_completions,
            "/chat/completions": self.handle_chat_completions,
        }

        if self.path not in endpoints:
            self._set_completion_headers(404)
            self.end_headers()
            self.wfile.write(b"Not Found")
            return

        # Fetch and parse request body
        content_length = int(self.headers["Content-Length"])
        raw_body = self.rfile.read(content_length)
        try:
            self.body = json.loads(raw_body.decode())
        except json.JSONDecodeError as e:
            logging.error(f"JSONDecodeError: {e} - Raw body: {raw_body.decode()}")
            # Set appropriate headers based on streaming requirement
            if self.stream:
                self._set_stream_headers(400)
                self.wfile.write(
                    f"data: {json.dumps({'error': f'Invalid JSON in request body: {e}'})}\n\n".encode()
                )
            else:
                self._set_completion_headers(400)
                self.wfile.write(
                    json.dumps({"error": f"Invalid JSON in request body: {e}"}).encode()
                )
            return

        indent = "\t"  # Backslashes can't be inside of f-strings
        logging.debug(f"Incoming Request Body: {json.dumps(self.body, indent=indent)}")
        assert isinstance(
            self.body, dict
        ), f"Request should be dict, but got {type(self.body)}"

        # Extract request parameters from the body
        self.stream = self.body.get("stream", False)
        self.stream_options = self.body.get("stream_options", None)
        self.requested_model = self.body.get("model", "default_model")
        self.requested_draft_model = self.body.get("draft_model", "default_model")
        self.num_draft_tokens = self.body.get(
            "num_draft_tokens", self.model_provider.cli_args.num_draft_tokens
        )
        self.adapter = self.body.get("adapters", None)
        self.max_tokens = self.body.get("max_completion_tokens", None)
        if self.max_tokens is None:
            self.max_tokens = self.body.get(
                "max_tokens", self.model_provider.cli_args.max_tokens
            )
        self.temperature = self.body.get(
            "temperature", self.model_provider.cli_args.temp
        )
        self.top_p = self.body.get("top_p", self.model_provider.cli_args.top_p)
        self.top_k = self.body.get("top_k", self.model_provider.cli_args.top_k)
        self.min_p = self.body.get("min_p", self.model_provider.cli_args.min_p)
        self.repetition_penalty = self.body.get("repetition_penalty", 1.0)
        self.repetition_context_size = self.body.get("repetition_context_size", 20)
        self.xtc_probability = self.body.get("xtc_probability", 0.0)
        self.xtc_threshold = self.body.get("xtc_threshold", 0.0)
        self.logit_bias = self.body.get("logit_bias", None)
        self.logprobs = self.body.get("logprobs", -1)
        self.validate_model_parameters()
        # Load the model if needed
        try:
            self.model, self.tokenizer = self.model_provider.load(
                self.requested_model,
                self.adapter,
                self.requested_draft_model,
            )
        except:
            self._set_completion_headers(404)
            self.end_headers()
            self.wfile.write(b"Not Found")
            return

        # Get stop id sequences, if provided
        stop_words = self.body.get("stop")
        stop_words = stop_words or []
        stop_words = [stop_words] if isinstance(stop_words, str) else stop_words
        stop_id_sequences = [
            self.tokenizer.encode(stop_word, add_special_tokens=False)
            for stop_word in stop_words
        ]

        # Send header type
        (
            self._set_stream_headers(200)
            if self.stream
            else self._set_completion_headers(200)
        )

        # Call endpoint specific method
        prompt = endpoints[self.path]()
        self.handle_completion(prompt, stop_id_sequences)

    def validate_model_parameters(self):
        """
        Validate the model parameters passed in the request for the correct types and values.
        """
        if not isinstance(self.stream, bool):
            raise ValueError("stream must be a boolean")

        if not isinstance(self.max_tokens, int) or self.max_tokens < 0:
            raise ValueError("max_tokens must be a non-negative integer")

        if not isinstance(self.temperature, (float, int)) or self.temperature < 0:
            raise ValueError("temperature must be a non-negative float")

        if not isinstance(self.top_p, (float, int)) or self.top_p < 0 or self.top_p > 1:
            raise ValueError("top_p must be a float between 0 and 1")

        if not isinstance(self.top_k, int) or self.top_k < 0:
            raise ValueError("top_k must be a non-negative integer")

        if not isinstance(self.min_p, (float, int)) or self.min_p < 0 or self.min_p > 1:
            raise ValueError("min_p must be a float between 0 and 1")

        if not isinstance(self.num_draft_tokens, int) or self.num_draft_tokens < 0:
            raise ValueError("num_draft_tokens must be a non-negative integer")

        if (
            not isinstance(self.repetition_penalty, (float, int))
            or self.repetition_penalty < 0
        ):
            raise ValueError("repetition_penalty must be a non-negative float")

        if self.logprobs != -1 and not (0 < self.logprobs <= 10):
            raise ValueError(
                f"logprobs must be between 1 and 10 but got {self.logprobs:,}"
            )

        if (
            not isinstance(self.repetition_context_size, int)
            or self.repetition_context_size < 0
        ):
            raise ValueError("repetition_context_size must be a non-negative integer")

        if self.logit_bias is not None:
            if not isinstance(self.logit_bias, dict):
                raise ValueError("logit_bias must be a dict of int to float")

            try:
                self.logit_bias = {int(k): v for k, v in self.logit_bias.items()}
            except ValueError:
                raise ValueError("logit_bias must be a dict of int to float")
        if not (
            isinstance(self.xtc_probability, float)
            and 0.00 <= self.xtc_probability <= 1.00
        ):
            raise ValueError(f"xtc_probability must be a float between 0.00 and 1.00")
        if not (
            isinstance(self.xtc_threshold, float) and 0.00 <= self.xtc_threshold <= 0.50
        ):
            raise ValueError(f"xtc_threshold must be a float between 0.00 and 0.5")
        if not isinstance(self.requested_model, str):
            raise ValueError("model must be a string")
        if self.adapter is not None and not isinstance(self.adapter, str):
            raise ValueError("adapter must be a string")

    def generate_response(
        self,
        text: str,
        finish_reason: Union[Literal["length", "stop"], None],
        prompt_token_count: Optional[int] = None,
        completion_token_count: Optional[int] = None,
        token_logprobs: Optional[List[float]] = None,
        top_tokens: Optional[List[Dict[int, float]]] = None,
        tokens: Optional[List[int]] = None,
        tool_calls: Optional[List[str]] = None,
    ) -> dict:
        """
        Generate a single response packet based on response type (stream or
        not), completion type and parameters.

        Args:
            text (str): Text generated by model
            finish_reason (Union[Literal["length", "stop"], None]): The reason the
              response is being sent: "length", "stop" or `None`.
            prompt_token_count (Optional[int]): The number of tokens in the prompt,
              used to populate the "usage" field (not used when stream).
            completion_token_count (Optional[int]): The number of tokens in the
              response, used to populate the "usage" field (not used when stream).
            token_logprobs (Optional[List[float]]): The log probabilities per token,
              in token order.
            top_tokens (Optional[List[Dict[int, float]]]): List of dictionaries mapping
              tokens to logprobs for the top N tokens at each token position.
            tokens (Optional[List[int]]): List of tokens to return with logprobs structure
            tool_calls (Optional[List[str]]): List of tool calls.

        Returns:
            dict: A dictionary containing the response, in the same format as
              OpenAI's API.
        """
        token_logprobs = token_logprobs or []
        top_logprobs = top_tokens or []
        tool_calls = tool_calls or []

        def parse_function(tool_text):
            tool_call = json.loads(tool_text.strip())
            return {
                "function": {
                    "name": tool_call.get("name", None),
                    "arguments": json.dumps(tool_call.get("arguments", "")),
                },
                "type": "function",
                "id": None,
            }

        # Static response
        response = {
            "id": self.request_id,
            "system_fingerprint": self.system_fingerprint,
            "object": self.object_type,
            "model": self.requested_model,
            "created": self.created,
            "choices": [
                {
                    "index": 0,
                    "finish_reason": finish_reason,
                },
            ],
        }

        if token_logprobs or top_logprobs or tokens:
            response["choices"][0]["logprobs"] = {
                "token_logprobs": token_logprobs,
                "top_logprobs": top_logprobs,
                "tokens": tokens,
            }

        if not self.stream:
            if not (
                isinstance(prompt_token_count, int)
                and isinstance(completion_token_count, int)
            ):
                raise ValueError(
                    "Response type is complete, but token counts not provided"
                )

            response["usage"] = {
                "prompt_tokens": prompt_token_count,
                "completion_tokens": completion_token_count,
                "total_tokens": prompt_token_count + completion_token_count,
            }

        choice = response["choices"][0]

        # Add dynamic response
        if self.object_type.startswith("chat.completion"):
            key_name = "delta" if self.stream else "message"
            choice[key_name] = {
                "role": "assistant",
                "content": text,
                "tool_calls": [parse_function(tool_text) for tool_text in tool_calls],
            }
        elif self.object_type == "text_completion":
            choice.update(text=text)
        else:
            raise ValueError(f"Unsupported response type: {self.object_type}")

        return response

    def reset_prompt_cache(self, prompt):
        """Resets the prompt cache and associated state.

        Args:
            prompt (List[int]): The tokenized new prompt which will populate the
                reset cache.
        """
        logging.debug(f"*** Resetting cache. ***")
        self.prompt_cache.model_key = self.model_provider.model_key
        self.prompt_cache.cache = make_prompt_cache(self.model_provider.model)
        if self.model_provider.draft_model is not None:
            self.prompt_cache.cache += make_prompt_cache(
                self.model_provider.draft_model
            )
        self.prompt_cache.tokens = list(prompt)  # Cache the new prompt fully

    def get_prompt_cache(self, prompt):
        """
        Determines the portion of the prompt that needs processing by comparing
        it to the cached prompt and attempting to reuse the common prefix.

        This function updates the internal prompt cache state (tokens and model cache)
        based on the comparison. If a common prefix exists, it attempts to trim
        the model cache (if supported) to match the common prefix length, avoiding
        recomputation.

        Args:
            prompt (List[int]): The tokenized new prompt.

        Returns:
            List[int]: The suffix of the prompt that actually needs to be processed
                       by the model. This will be the full prompt if the cache is
                       reset or cannot be effectively used.
        """
        cache_len = len(self.prompt_cache.tokens)
        prompt_len = len(prompt)
        com_prefix_len = common_prefix_len(self.prompt_cache.tokens, prompt)

        # Leave at least one token in the prompt
        com_prefix_len = min(com_prefix_len, len(prompt) - 1)

        # Condition 1: Model changed or no common prefix at all. Reset cache.
        if (
            self.prompt_cache.model_key != self.model_provider.model_key
            or com_prefix_len == 0
        ):
            self.reset_prompt_cache(prompt)

        # Condition 2: Common prefix exists and matches cache length. Process suffix.
        elif com_prefix_len == cache_len:
            logging.debug(
                f"*** Cache is prefix of prompt (cache_len: {cache_len}, prompt_len: {prompt_len}). Processing suffix. ***"
            )
            prompt = prompt[com_prefix_len:]
            self.prompt_cache.tokens.extend(prompt)

        # Condition 3: Common prefix exists but is shorter than cache length. Attempt trim.
        elif com_prefix_len < cache_len:
            logging.debug(
                f"*** Common prefix ({com_prefix_len}) shorter than cache ({cache_len}). Attempting trim. ***"
            )

            if can_trim_prompt_cache(self.prompt_cache.cache):
                num_to_trim = cache_len - com_prefix_len
                logging.debug(f"    Trimming {num_to_trim} tokens from cache.")
                trim_prompt_cache(self.prompt_cache.cache, num_to_trim)
                self.prompt_cache.tokens = self.prompt_cache.tokens[:com_prefix_len]
                prompt = prompt[com_prefix_len:]
                self.prompt_cache.tokens.extend(prompt)
            else:
                logging.debug(f"    Cache cannot be trimmed. Resetting cache.")
                self.reset_prompt_cache(prompt)

        # This case should logically not be reached if com_prefix_len <= cache_len
        else:
            logging.error(
                f"Unexpected cache state: com_prefix_len ({com_prefix_len}) > cache_len ({cache_len}). Resetting cache."
            )
            self.reset_prompt_cache(prompt)

        logging.debug(f"Returning {len(prompt)} tokens for processing.")
        return prompt

    def handle_completion(
        self,
        prompt: List[int],
        stop_id_sequences: List[List[int]],
    ):
        """
        Generate a response to a prompt and send it to the client in a single batch.

        Args:
            prompt (List[int]): The tokenized prompt.
            stop_id_sequences (List[List[int]]): A list of stop words passed
                to the stopping_criteria function
        """
        tokens = []
        finish_reason = "length"
        stop_sequence_suffix = None
        if self.stream:
            self.end_headers()
            logging.debug(f"Starting stream:")
        else:
            logging.debug(f"Starting completion:")
        token_logprobs = []
        top_tokens = []

        prompt = self.get_prompt_cache(prompt)

        text = ""
        tic = time.perf_counter()
        sampler = make_sampler(
            self.temperature,
            top_p=self.top_p,
            top_k=self.top_k,
            min_p=self.min_p,
            xtc_probability=self.xtc_probability,
            xtc_threshold=self.xtc_threshold,
            xtc_special_tokens=[
                self.tokenizer.eos_token_id,
                self.tokenizer.encode("\n"),
            ],
        )
        logits_processors = make_logits_processors(
            self.logit_bias,
            self.repetition_penalty,
            self.repetition_context_size,
        )

        tool_calls = []
        tool_text = ""
        in_tool_call = False
        segment = ""

        # --- ▼▼▼ ここから追加 ▼▼▼ ---
        # レスポンス形式を整形するための状態管理変数を初期化
        gemma_buffer = ""
        # 状態: INITIAL -> BUFFERING -> AWAITING_FINAL -> STREAMING
        gemma_state = "INITIAL"
        # --- ▲▲▲ ここまで追加 ▲▲▲ ---

        # Create keepalive callback to send SSE comments during long prompt processing
        def keepalive_callback(processed_tokens, total_tokens):
            logging.info(
                f"Prompt processing progress: {processed_tokens}/{total_tokens}"
            )
            if self.stream:
                try:
                    # Send SSE comment for keepalive - invisible to clients but keeps connection alive
                    self.wfile.write(
                        f": keepalive {processed_tokens}/{total_tokens}\n\n".encode()
                    )
                    self.wfile.flush()
                except (BrokenPipeError, ConnectionResetError, OSError):
                    # Client disconnected, ignore
                    pass

        for gen_response in stream_generate(
            model=self.model,
            tokenizer=self.tokenizer,
            prompt=prompt,
            max_tokens=self.max_tokens,
            sampler=sampler,
            logits_processors=logits_processors,
            prompt_cache=self.prompt_cache.cache,
            draft_model=self.model_provider.draft_model,
            num_draft_tokens=self.num_draft_tokens,
            prompt_progress_callback=keepalive_callback,
        ):
            logging.debug(gen_response.text)

            if (
                self.tokenizer.has_tool_calling
                and gen_response.text == self.tokenizer.tool_call_start
            ):
                in_tool_call = True
            elif in_tool_call:
                if gen_response.text == self.tokenizer.tool_call_end:
                    tool_calls.append(tool_text)
                    tool_text = ""
                    in_tool_call = False
                else:
                    tool_text += gen_response.text
            else:
                # --- ▼▼▼ ここから変更 ▼▼▼ ---
                # ストリーミングが有効、かつツールコール中でない場合に整形処理を実行
                if self.stream and not in_tool_call:
                    gemma_buffer += gen_response.text
                    segment_to_send = ""

                    # 状態: 初期状態。レスポンス形式を判定する
                    if gemma_state == "INITIAL":
                        if "<|channel|>" in gemma_buffer:
                            gemma_state = "BUFFERING"
                        elif len(gemma_buffer) > 11:  # len("<|channel|>")
                            gemma_state = "STREAMING"

                    # 状態: バッファリング中。analysisシーケンスを探す
                    if gemma_state == "BUFFERING":
                        analysis_seq = "<|channel|>analysis<|message|>"
                        if analysis_seq in gemma_buffer:
                            segment_to_send = gemma_buffer.replace(analysis_seq, f"<details>{analysis_seq}")
                            gemma_buffer = ""
                            gemma_state = "AWAITING_FINAL"

                    # 状態: finalシーケンスを待機中
                    if gemma_state == "AWAITING_FINAL":
                        final_seq = "<|channel|>final<|message|>"
                        if final_seq in gemma_buffer:
                            segment_to_send += gemma_buffer.replace(final_seq, f"{final_seq}</details>  ")
                            gemma_buffer = ""
                            gemma_state = "STREAMING"
                        else:
                            # シーケンスがトークン境界で分割される可能性を考慮し、
                            # バッファの末尾(シーケンス長-1)文字を残して送信
                            safe_flush_len = len(gemma_buffer) - (len(final_seq) - 1)
                            if safe_flush_len > 0:
                                segment_to_send += gemma_buffer[:safe_flush_len]
                                gemma_buffer = gemma_buffer[safe_flush_len:]

                    # 状態: 通常ストリーミング。バッファをすべて送信
                    if gemma_state == "STREAMING":
                        segment_to_send += gemma_buffer
                        gemma_buffer = ""

                    # 処理したテキストを送信セグメントと全体テキストに追加
                    segment += segment_to_send
                    text += segment_to_send
                else:
                    # ストリーミングでない場合やツールコール中は元の動作
                    text += gen_response.text
                    segment += gen_response.text
                # --- ▲▲▲ ここまで変更 ▲▲▲ ---
            token = gen_response.token
            logprobs = gen_response.logprobs
            tokens.append(token)
            self.prompt_cache.tokens.append(token)

            if self.logprobs > 0:
                sorted_indices = mx.argpartition(-logprobs, kth=self.logprobs - 1)
                top_indices = sorted_indices[: self.logprobs]
                top_logprobs = logprobs[top_indices]
                top_token_info = zip(top_indices.tolist(), top_logprobs.tolist())
                top_tokens.append(tuple(top_token_info))

            token_logprobs.append(logprobs[token].item())

            stop_condition = stopping_criteria(
                tokens, stop_id_sequences, self.tokenizer.eos_token_id
            )
            if stop_condition.stop_met:
                finish_reason = "stop"
                if stop_condition.trim_length:
                    stop_sequence_suffix = self.tokenizer.decode(
                        tokens[-stop_condition.trim_length :]
                    )
                    text = text[: -len(stop_sequence_suffix)]
                segment = ""
                break

            if self.stream and not in_tool_call:
                # If the end of tokens overlaps with a stop sequence, generate new
                # tokens until we know if the stop sequence is hit or not
                if any(
                    (
                        sequence_overlap(tokens, sequence)
                        for sequence in stop_id_sequences
                    )
                ):
                    continue
                elif segment or tool_calls:
                    response = self.generate_response(
                        segment, None, tool_calls=tool_calls
                    )
                    self.wfile.write(f"data: {json.dumps(response)}\n\n".encode())
                    self.wfile.flush()
                    segment = ""
                    tool_calls = []

        # --- ▼▼▼ ここから追加 ▼▼▼ ---
        # ループ終了後、バッファにデータが残っていれば最後のセグメントに追加
        if gemma_buffer:
            segment += gemma_buffer
            gemma_buffer = ""
        # --- ▲▲▲ ここまで追加 ▲▲▲ ---

        if gen_response.finish_reason is not None:
            finish_reason = gen_response.finish_reason

        logging.debug(f"Prompt: {gen_response.prompt_tps:.3f} tokens-per-sec")
        logging.debug(f"Generation: {gen_response.generation_tps:.3f} tokens-per-sec")
        logging.debug(f"Peak memory: {gen_response.peak_memory:.3f} GB")

        if self.stream:
            response = self.generate_response(
                segment, finish_reason, tool_calls=tool_calls
            )
            self.wfile.write(f"data: {json.dumps(response)}\n\n".encode())
            self.wfile.flush()
            if self.stream_options is not None and self.stream_options["include_usage"]:
                original_prompt_length = (
                    len(self.prompt_cache.tokens) - len(tokens) + len(prompt)
                )
                response = self.completion_usage_response(
                    original_prompt_length, len(tokens)
                )
                self.wfile.write(f"data: {json.dumps(response)}\n\n".encode())
                self.wfile.flush()
            self.wfile.write("data: [DONE]\n\n".encode())
            self.wfile.flush()
        else:
            response = self.generate_response(
                text,
                finish_reason,
                len(prompt),
                len(tokens),
                token_logprobs=token_logprobs,
                top_tokens=top_tokens,
                tokens=tokens,
                tool_calls=tool_calls,
            )
            response_json = json.dumps(response).encode()
            indent = "\t"  # Backslashes can't be inside of f-strings
            logging.debug(f"Outgoing Response: {json.dumps(response, indent=indent)}")

            # Send an additional Content-Length header when it is known
            self.send_header("Content-Length", str(len(response_json)))
            self.end_headers()
            self.wfile.write(response_json)
            self.wfile.flush()

    def completion_usage_response(
        self,
        prompt_token_count: Optional[int] = None,
        completion_token_count: Optional[int] = None,
    ):
        response = {
            "id": self.request_id,
            "system_fingerprint": self.system_fingerprint,
            "object": "chat.completion",
            "model": self.requested_model,
            "created": self.created,
            "choices": [],
            "usage": {
                "prompt_tokens": prompt_token_count,
                "completion_tokens": completion_token_count,
                "total_tokens": prompt_token_count + completion_token_count,
            },
        }
        return response

    def handle_chat_completions(self) -> List[int]:
        """
        Handle a chat completion request.

        Returns:
            mx.array: A mx.array of the tokenized prompt from the request body
        """
        body = self.body
        assert "messages" in body, "Request did not contain messages"

        # Determine response type
        self.request_id = f"chatcmpl-{uuid.uuid4()}"
        self.object_type = "chat.completion.chunk" if self.stream else "chat.completion"
        if self.tokenizer.chat_template:
            messages = body["messages"]

            # --- Changes from here ---
            # Modify message based on the `mlx-lm` chat template.
            for message in messages:
                if message["role"] == "assistant":
                    content = message.get("content", "")
                    if "<|channel|>analysis<|message|>" in content and "<|channel|>final<|message|>" in content:
                        try:
                            analysis_start_tag = "<|channel|>analysis<|message|>"
                            analysis_end_tag = "<|end|>"
                            final_start_tag = "<|channel|>final<|message|>"

                            analysis_start = content.find(analysis_start_tag) + len(analysis_start_tag)
                            analysis_end = content.find(analysis_end_tag)
                            final_start = content.find(final_start_tag) + len(final_start_tag)

                            analysis = content[analysis_start:analysis_end].strip()
                            final = content[final_start:].strip()

                            message["content"] = final
                            message["thinking"] = analysis
                        except Exception as e:
                            logging.error(f"Failed to parse assistant message with analysis/final tags: {e}")
                            # If parsing fails, leave the content and empty thinking
                            message["thinking"] = ""
            # --- to here ---

            process_message_content(messages)

            # Moved response_format before `apply_chat_template`
            if body.get("response_format", {}).get("type") == "json_object":
                if self.tokenizer.chat_template is None:
                    raise ValueError("JSON response format requested, but tokenizer has no chat template. Consider using `--use-default-chat-template`")
                messages.append({"role": "user", "content": self.tokenizer.json_schema_prompt})

            prompt = self.tokenizer.apply_chat_template(
                messages,
                body.get("tools") or None,
                add_generation_prompt=True,
                **self.model_provider.cli_args.chat_template_args,
            )
        else:
            prompt = convert_chat(body["messages"], body.get("role_mapping"))
            prompt = self.tokenizer.encode(prompt)

        return prompt

    def handle_text_completions(self) -> List[int]:
        """
        Handle a text completion request.

        Returns:
            mx.array: A mx.array of the tokenized prompt from the request body
        """
        # Determine response type
        self.request_id = f"cmpl-{uuid.uuid4()}"
        self.object_type = "text_completion"
        assert "prompt" in self.body, "Request did not contain a prompt"
        return self.tokenizer.encode(self.body["prompt"])

    def do_GET(self):
        """
        Respond to a GET request from a client.
        """
        if self.path.startswith("/v1/models"):
            self.handle_models_request()
        elif self.path == "/health":
            self.handle_health_check()
        else:
            self._set_completion_headers(404)
            self.end_headers()
            self.wfile.write(b"Not Found")

    def handle_health_check(self):
        """
        Handle a GET request for the /health endpoint.
        """
        self._set_completion_headers(200)
        self.end_headers()

        self.wfile.write('{"status": "ok"}'.encode())
        self.wfile.flush()

    def handle_models_request(self):
        """
        Handle a GET request for the /v1/models endpoint.
        """
        self._set_completion_headers(200)
        self.end_headers()

        files = ["config.json", "model.safetensors.index.json", "tokenizer_config.json"]

        parts = self.path.split("/")
        filter_repo_id = None
        if len(parts) > 3:
            filter_repo_id = "/".join(parts[3:])

        def probably_mlx_lm(repo):
            if repo.repo_type != "model":
                return False
            if "main" not in repo.refs:
                return False
            if filter_repo_id is not None and repo.repo_id != filter_repo_id:
                return False
            file_names = {f.file_path.name for f in repo.refs["main"].files}
            return all(f in file_names for f in files)

        # Scan the cache directory for downloaded mlx models
        hf_cache_info = scan_cache_dir()
        downloaded_models = [
            repo for repo in hf_cache_info.repos if probably_mlx_lm(repo)
        ]

        # Create a list of available models
        models = [
            {
                "id": repo.repo_id,
                "object": "model",
                "created": self.created,
            }
            for repo in downloaded_models
        ]

        response = {"object": "list", "data": models}

        response_json = json.dumps(response).encode()
        self.wfile.write(response_json)
        self.wfile.flush()


def run(
    host: str,
    port: int,
    model_provider: ModelProvider,
    server_class=HTTPServer,
    handler_class=APIHandler,
):
    server_address = (host, port)
    prompt_cache = PromptCache()
    infos = socket.getaddrinfo(
        *server_address, type=socket.SOCK_STREAM, flags=socket.AI_PASSIVE
    )
    server_class.address_family, _, _, _, server_address = next(iter(infos))
    httpd = server_class(
        server_address,
        lambda *args, **kwargs: handler_class(
            model_provider,
            prompt_cache=prompt_cache,
            system_fingerprint=get_system_fingerprint(),
            *args,
            **kwargs,
        ),
    )
    warnings.warn(
        "mlx_lm.server is not recommended for production as "
        "it only implements basic security checks."
    )
    logging.info(f"Starting httpd at {host} on port {port}...")
    httpd.serve_forever()


def main():
    parser = argparse.ArgumentParser(description="MLX Http Server.")
    parser.add_argument(
        "--model",
        type=str,
        help="The path to the MLX model weights, tokenizer, and config",
    )
    parser.add_argument(
        "--adapter-path",
        type=str,
        help="Optional path for the trained adapter weights and config.",
    )
    parser.add_argument(
        "--host",
        type=str,
        default="127.0.0.1",
        help="Host for the HTTP server (default: 127.0.0.1)",
    )
    parser.add_argument(
        "--port",
        type=int,
        default=8080,
        help="Port for the HTTP server (default: 8080)",
    )
    parser.add_argument(
        "--draft-model",
        type=str,
        help="A model to be used for speculative decoding.",
        default=None,
    )
    parser.add_argument(
        "--num-draft-tokens",
        type=int,
        help="Number of tokens to draft when using speculative decoding.",
        default=3,
    )
    parser.add_argument(
        "--trust-remote-code",
        action="store_true",
        help="Enable trusting remote code for tokenizer",
    )
    parser.add_argument(
        "--log-level",
        type=str,
        default="INFO",
        choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
        help="Set the logging level (default: INFO)",
    )
    parser.add_argument(
        "--chat-template",
        type=str,
        default="",
        help="Specify a chat template for the tokenizer",
        required=False,
    )
    parser.add_argument(
        "--use-default-chat-template",
        action="store_true",
        help="Use the default chat template",
    )
    parser.add_argument(
        "--temp",
        type=float,
        default=0.0,
        help="Default sampling temperature (default: 0.0)",
    )
    parser.add_argument(
        "--top-p",
        type=float,
        default=1.0,
        help="Default nucleus sampling top-p (default: 1.0)",
    )
    parser.add_argument(
        "--top-k",
        type=int,
        default=0,
        help="Default top-k sampling (default: 0, disables top-k)",
    )
    parser.add_argument(
        "--min-p",
        type=float,
        default=0.0,
        help="Default min-p sampling (default: 0.0, disables min-p)",
    )
    parser.add_argument(
        "--max-tokens",
        type=int,
        default=512,
        help="Default maximum number of tokens to generate (default: 512)",
    )
    parser.add_argument(
        "--chat-template-args",
        type=json.loads,
        help="""A JSON formatted string of arguments for the tokenizer's apply_chat_template, e.g. '{"enable_thinking":false}'""",
        default="{}",
    )
    args = parser.parse_args()

    logging.basicConfig(
        level=getattr(logging, args.log_level.upper(), None),
        format="%(asctime)s - %(levelname)s - %(message)s",
    )
    run(args.host, args.port, ModelProvider(args))


if __name__ == "__main__":
    print(
        "Calling `python -m mlx_lm.server...` directly is deprecated."
        " Use `mlx_lm.server...` or `python -m mlx_lm server ...` instead."
    )
    main()

変更箇所は全てAPIHandlerクラスのhandle_completion内です。コメントが入っているのでわかりやすいかと思います。

使い方

仮想環境内の mlx_lm フォルダに入って、server.py をバックアップします。例えばこんな感じで:

find . -name mlx_lm
(出力例)
./.venv/lib/python3.12/site-packages/mlx_lm
cd ./.venv/lib/python3.12/site-packages/mlx_lm
cp server.py server.py.original

そして server.py に上記「スクリプト全てを見るにはここをクリック」の中身をコピペしてサーバを実行します。

コピーはスクリプトの右上のコピーボタンを使い、ペーストはこんな感じ ↓ でやってみてください。クリップボードの中身をファイルに書き出してくれます (最近知ってうれしかったので紹介)。多分pbpaste > server.pyでも動作に問題無いと思うのですが、最後に改行を付けています。

下のコマンドもコピペする場合の順番としては、まず下のコマンドをターミナルにコピペ (まだエンターキー押下せず)、「スクリプト全てを見るにはここを開いてください」の中身をコピー、ターミナルに戻ってエンターキー押下、でイケると思います。

printf '%s\n' "$(pbpaste)" > server.py

さて、これで MLX-LM の API サーバを立ち上げ (例: mlx_lm.server --host 0.0.0.0 --port 9999 --log-level DEBUG) うまくいくと、思考部分が「▶詳細」の中にしまわれ、回答のみがスッキリと表示されるようになります。もちろん思考の内容は「▶詳細」クリックすると開けます。

見た目を HTML の <details>タグで隠しているだけなのですが、思考中も回答も全てストリーミングするのでいい感じです。

Open WebUI 「も」使っている方は、Open WebUI の Reasoning Tag を以下の様にしてください。

Start Tag: <details><|channel|>analysis<|message|>

End Tag: <|end|><|start|>assistant<|channel|>final<|message|></details>

以上で設定は終わりです、お疲れ様でした。後は雑記ですので、読むか読まないかはあなた次第です。

gpt-oss または Qwen3 A30B 等のローカル LLM を使って解決した?

いいえ。それぞれの LLM の性能を試す良い機会だと思ったのですが、gpt-oss は<|channel|>等のタグがチャット内で表示されず、Qwen3 A30B は<details>タグで生成されたテキストの所々が「▶詳細」の中にしまわれたりコードと回答部分がごちゃごちゃになるような状況でなかなかスムーズに進められず、ある程度いじってからこれらのローカルモデルを使った解決は諦めました。

結局解決したときに使っていたのはまたもや無料版の Gemini (2.5 Flash と Pro) でした。ただしコンテキスト長の問題で元の server.py のコード全ては読み込んでくれなかったので、変更が必要な部分の特定には 262,144 (256K) トークンものコンテキスト長が使える Qwen3 30B A3B を使いました。変更が必要なメソッドの特定ができたので、そのメソッド全部と、やりたいこと、MLX_LM サーバの Debug ログ、注意点、等を投げて、変更後のメソッド全部を吐き出してもらいました。結果、一発で動くものができたので、やっぱすごいっすね、Gemini。

せっかくなので、Gemini に投げたプロンプトの、該当メソッド以外の部分を貼っておきます。興味のある方は覗いてみてください (プロンプトでは「クラス」と言っていますがメソッドですね)。

Python のコードを渡すので変更をお願いします。コードは LLM の API サーバの一部で、ユーザからのプロンプトを受け、LLM の生成するトークンをストリーミングでクライアントに返します。
使用する LLM は、以下のフォーマットで API クライアントにレスポンスします。
```
<|channel|>analysis<|message|>The user asks in Japanese: "2+2は?" meaning "What is 2+2?" The answer is 4. Should respond in Japanese.<|end|><|start|>assistant<|channel|>final<|message|>4です。
```
この例では、"4です。"のみが最終的にユーザが欲しい回答なので、その前を全て`<details>`と`</details>`タグで囲むのがゴールです。つまり、希望する出力は以下となります:
```
<details><|channel|>analysis<|message|>The user asks in Japanese: "2+2は?" meaning "What is 2+2?" The answer is 4. Should respond in Japanese.<|end|><|start|>assistant<|channel|>final<|message|></details>4です。
```
サーバのログのサンプルは以下の通りです。`<|channel|>`, `analysis`, `<|message|>`, `final` それぞれが 1トークンのようです。
```
2025-09-03 23:34:28,637 - DEBUG - <|channel|>
2025-09-03 23:34:28,648 - DEBUG - analysis
2025-09-03 23:34:28,660 - DEBUG - <|message|>
2025-09-03 23:34:28,672 - DEBUG - The
2025-09-03 23:34:28,684 - DEBUG -  user
2025-09-03 23:34:28,695 - DEBUG -  asks
2025-09-03 23:34:28,707 - DEBUG -  in
2025-09-03 23:34:28,719 - DEBUG -  Japanese
2025-09-03 23:34:28,731 - DEBUG - :
2025-09-03 23:34:28,743 - DEBUG -  "
2025-09-03 23:34:28,755 - DEBUG - 2
2025-09-03 23:34:28,767 - DEBUG - +
2025-09-03 23:34:28,778 - DEBUG - 2
2025-09-03 23:34:28,790 - DEBUG - は
2025-09-03 23:34:28,802 - DEBUG - ?
2025-09-03 23:34:28,816 - DEBUG - "
2025-09-03 23:34:28,828 - DEBUG -  meaning
2025-09-03 23:34:28,840 - DEBUG -  "
2025-09-03 23:34:28,851 - DEBUG - What
2025-09-03 23:34:28,863 - DEBUG -  is
2025-09-03 23:34:28,875 - DEBUG - 
2025-09-03 23:34:28,887 - DEBUG -  2
2025-09-03 23:34:28,899 - DEBUG - +
2025-09-03 23:34:28,910 - DEBUG - 2
2025-09-03 23:34:28,922 - DEBUG - ?"
2025-09-03 23:34:28,934 - DEBUG -  The
2025-09-03 23:34:28,946 - DEBUG -  answer
2025-09-03 23:34:28,958 - DEBUG -  is
2025-09-03 23:34:28,969 - DEBUG - 
2025-09-03 23:34:28,981 - DEBUG -  4
2025-09-03 23:34:28,993 - DEBUG - .
2025-09-03 23:34:29,005 - DEBUG -  Should
2025-09-03 23:34:29,017 - DEBUG -  respond
2025-09-03 23:34:29,028 - DEBUG -  in
2025-09-03 23:34:29,040 - DEBUG -  Japanese
2025-09-03 23:34:29,052 - DEBUG - .
2025-09-03 23:34:29,064 - DEBUG - <|end|>
2025-09-03 23:34:29,075 - DEBUG - <|start|>
2025-09-03 23:34:29,087 - DEBUG - assistant
2025-09-03 23:34:29,099 - DEBUG - <|channel|>
2025-09-03 23:34:29,111 - DEBUG - final
2025-09-03 23:34:29,123 - DEBUG - <|message|>
2025-09-03 23:34:29,135 - DEBUG - 4
2025-09-03 23:34:29,147 - DEBUG - です
2025-09-03 23:34:29,159 - DEBUG - 。
2025-09-03 23:34:29,172 - DEBUG - 
```

コードを変更するときの注意点は以下となります:
1. 最小限の変更にとどめる
2. ストリーミングするトークンを必要なだけバッファし、`<|channel|>analysis<|message|>`が含まれたら`<details><|channel|>analysis<|message|>`に変更して API クライアントへ送る
3. 新たなトークンを受け取ったら先頭のトークンから API クライアントへ送る
4. バッファに`<|channel|>final<|message|>`が現れたら、`<|channel|>final<|message|></details>`に変更して API クライアントへ送る
5. 同じコードで他の LLM のストリーミングにも対応するため、もし先頭からのバッファに`<|channel|>`が含まれなければその時点でバッファをやめ、残りは全てストリーミングで API クライアントへ送る

以下が`server.py`の該当クラス部分です。こちらに変更を加えて上記を実現してください。

Gemini の回答は以下で、その後にdef handle_completion(からつづくメソッドが表示されました。上記の server.py はほぼそのままのコードを反映しています。

はい、承知いたしました。ご指定の要件に基づき、LLMからのレスポンスを整形するロジックを組み込んだserver.pyのhandle_completionメソッドのコードを以下に提示します。

変更点は、元のコードの構造を維持しつつ、ストリーミング処理の部分にトークンをバッファリングし、特定のシーケンスを検出して<details>および</details>タグを挿入するロジックを追加した点です。これにより、他のLLMのレスポンス形式に影響を与えることなく、目的のフォーマットにのみ対応できます。

変更後の server.py

以下が、handle_completionメソッドの変更後のコードです。変更・追加された箇所にはコメントを付記しています。

ところで一ヶ月以上前から Safari では Dify のチャットボットが使えなくなった

正確に言うと、使っているうちに履歴が見れなくなります。新しいバージョンの Dify ではもう少し細かいエラーを吐きますが、”Conversation not exists.” がブラウザの右上に表示されて、それまでの履歴が見れなくなるという症状です。オーナーとして Dify にログインして AI アプリを作るのは問題無いです。

新しい Dify のもう少し細かいエラー:

Conversation Not Exists. You have requested this URI [/api/messages] but did you mean /api/messages or /api/chat-messages or /api/saved-messages ?

macOS 自体 (含む Safari) や Dify のアップデートもあったので、それらのどれかまたは組み合わせが原因だと思いますが、1ヶ月以上経っても解決できずにいます。結構な時間を使って色々試したのですがどうにも小手先で解決できそうになく、Dify のチャットボットは Chrome で使っています。

でもできれば Chrome 使いたくないなーというわけで、Safari で問題無く動く Open WebUI を使うようになったことが本記事の「正攻法」に気がつくきっかけとなりました。

Image by Stable Diffusion (Mochi Diffusion)

「マッドサイエンティストが作ったサイボーグ」本記事の内容はここまでの魔改造ではないですが、一番破綻がなかったのでこれを採用。絵が怖いから Google さんはまたインデックスしてくれなかったりするのかな

Date:
2025年9月5日 1:48:33

Model:
realisticVision-v51VAE_original_768x512_cn

Size:
768 x 512

Include in Image:
mad scientist looking at a super cyborg

Exclude from Image:

Seed:
3303002455

Steps:
20

Guidance Scale:
20.0

Scheduler:
DPM-Solver++

ML Compute Unit:
CPU & GPU

MLX が MXFP4 量子化に対応して gpt-oss がますます速くなった!

Mac でローカル LLM やるなら MLX/MLX-LM を使おう!と推し推しキャンペーン中ですが、つい先日 MXFP4 量子化に対応したバージョンがリリースされ、gpt-oss がますます速くなりました。すんばらしい!

Dify や Open WebUI で使う方法も書いています。

gpt-oss-20b-MXFP4-Q8 を Dify で使った時。ヤバー!「10 t/s あれば使える」とか言っていた頃が懐かしい

(2026/02/02 追記) 多分どんなクライアントでも動くであろう gpt-oss 用プロキシサーバを書きました。MLX-LM じゃなくてもイケるんじゃなかろうか ↓

環境、バージョン、モデル

※ どれも本日時点で最新バージョンですが、Open WebUI と Dify はまだ gpt-oss の harmony response format に対応していないので、そのままでは動きません。ボクは MLX-LM のサーバスクリプトに変更を加えることで動かしています (やり方は以下)。LM Studio を使っている人は読む必要の無い内容となっています。

MLX-LM 環境の作り方はこちら:

いきなりですが mlx-community の Q8 は本当に Q8 なのかわからん

細かい技術的なところはよくわからないのでよくわかっている人が書いている記事を読んで欲しいのですが、ボクにとって何がよくわからんって、Q8 のサイズが Q4 とほとんど変わらないってところです。わかりやすいので LM Studio で表示されるサイズを以下にまとめました。下の表は全て mlx-community で MXFP4 量子化されたものです。

mlx-community モデルファイルサイズ
gpt-oss-120b-MXFP4-Q863.41 GB
gpt-oss-120b-MXFP4-Q462.36 GB
gpt-oss-20b-MXFP4-Q812.10 GB
gpt-oss-20b-MXFP4-Q411.21 GB

比較対象として、他のコントリビュータによるこれまでの MLX 量子化のサイズをまとめました。変換時のオプションで量子化後のサイズに多少変化があるのかも知れませんが、Q8 (8bit quant) と Q4 (4bit quant) の間には大きな差 (20b で 9GB) があるのがわかります。

モデルファイルサイズ
lmstudio-community/gpt-oss-120b-MLX-8bit124.20 GB
nightmedia/gpt-oss-120bq4-hi-mlx73.10 GB
lmstudio-community/gpt-oss-20b-MLX-8bit22.26 GB
nightmedia/gpt-oss-20bq4-hi-mlx13.11 GB

LM Studio の Model Directory でダウンロード済みの gpt-oss-20b-MXFP4-Q8 を見ると Quant が4bitとなっています。これが、MXFP4 を指しているのか、Hugging Face で誰かが質問しているように、mlx-community が間違って Q4 を Q8 としてアップロードしてしまったのかはわかりません。そのうちバリエーションがアップされるでしょうから、そのときに何が正しいのかがわかりそうです。いずれにせよ MXFP4 では Q8 は Q4 より 1 GB 程度しか大きくないので、動かせる環境なら Q8 でいいじゃん、という判断ができますよね。4-bit の MXFP4 をさらに 8-bit に量子化する (劣化したものを高精細化する!?) のは意味が無い、という見解もあるので、MXFP4 であれば 8-bit も 4-bit も性能・サイズともに性能に違いはほぼ無い、ということなのかなと想像しています。

性能的なところに少し触れると、2日ほど日本語で 20b-MXFP4-Q8 を使ってきた感覚では、旧 MLX の 6.5bit と比較して問題を感じません。20b-MXFP4-Q8 のサイズが 12.10 GB なのに対し、旧 MLX の 6.5bit は 17.03 GB です。

少ない性能劣化で速度も出る

同じ量子化技術を使う場合、ビット数が小さいほど (Q8 より Q4 等) 劣化し元々の性能が出なくなります。その代わりファイルサイズが抑えられるので小さな VRAM で実行でき、結果として処理速度が速くなります。一般的に Q8 は性能の劣化の少なさと処理速度向上のバランスが良いためよく使われています。

MXFP4 の場合、上記の通り Q8 で性能をほぼ維持したままファイルサイズが半分ほどになっているわけです。単純にその点だけ考えても高速で動くことがわかりますね。素の MLX 版でも速かったのに、速読スキルがないと目では追えないレベルです。

LLM の性能の良し悪しはなかなか評価が難しいですが、比較サイトなどでわずかな差で 32B の他のモデルの性能が高いと評価されていたとしましょう。しかし手持ちの VRAM 容量や使用したいコンテキスト長の絡みで 32B モデルは Q4 以下の量子化が必要ということであれば、実際の性能差は近づく可能性が高いです。ものによっては逆転の可能性もあるかもしれません。

MLX の強みのひとつ: コンテキスト長の調整が不要

MXFP4 自体のメリットではないのですが、LLM 自体のサイズが小さくなればそれだけ VRAM の空きに余裕ができるので、大きなコンテキストサイズを扱うことができます。Ollama は使用するコンテキスト長を指定してチャットを始める必要があるので、大きなサイズを扱いたいときは事前に計算したりして攻めたサイズを指定してあげる必要があります。これを間違うと部分的に CPU で処理がされることになり、大きな速度低下の原因になります。

MLX で LLM を扱う際の大きな魅力のひとつが、コンテキスト長の指定が不要 (ダイナミック) というところだと思っています。モデルが扱えるサイズの上限を入れておけばよく、もちろん VRAM サイズを超えたときには動作が遅くなりますが、そのときまでは気にせず使えます (遅くなったら新しいチャットを始めるの精神)。コンテキストが小さい間は Mac のメモリ使用量自体が抑えられているので安心感があります。

以下は Ollama のコンテキスト長関連の記事です。Ollama の速度でお悩みの方はどうぞ。

日本語: Ollama 単体では速い LLM が、なぜか Dify や Continue から使うと遅い、という時の解決方法

日本語: Ollama の高速化と VRAM 最適化設定 (ファインチューニング: 2)

API クライアントからムリヤリ使う方法

前回の記事でも書いたのですが、gpt-oss の Harmony Response Format に対応していない LLM フロントエンド/API クライアント (Open WebUI や Dify 等) で gpt-oss を使うと、2回目のチャットでエラーが出ます。本来はクライアント側で思考部分を API サーバに送らない、というのが正解なのですが、とりあえず MLX-LM のサーバスクリプト側で受け取った思考部分を切り捨てるという方法で対処可能です。どの API クライアントでも使えるので、正式対応されるまではしのげると思います。

ただ、MLX-LM のバージョンアップと共にサーバスクリプト (server.py) も変更されたため、前回の記事で紹介したスクリプトはそのままでは動きません (試してませんが無理でしょう)。

というわけで、以下に mlx-lm 0.27.0 に対応したコードを貼っておきます。

直下は変更前のserver.pyの対象箇所です。念のためcp server.py server.py.original等として元のファイルを保存してから作業を進めてください。

ハイライトされている、862行目が書き換えの対象です (前後にコードを挟みます)。

        if self.tokenizer.chat_template:
             messages = body["messages"]                                                                                                                                                                               
             process_message_content(messages)
             prompt = self.tokenizer.apply_chat_template(
                 messages,
                 body.get("tools") or None,
                 add_generation_prompt=True,
                 **self.model_provider.cli_args.chat_template_args,
             )

以下に置き換えます。


             # --- Changes from here ---
             # Modify message based on the `mlx-lm` chat template.
             for message in messages:
                 if message["role"] == "assistant":
                     content = message.get("content", "")
                     if "<|channel|>analysis<|message|>" in content and "<|channel|>final<|message|>" in content:
                         try:
                             analysis_start_tag = "<|channel|>analysis<|message|>"
                             analysis_end_tag = "<|end|>"
                             final_start_tag = "<|channel|>final<|message|>"
 
                             analysis_start = content.find(analysis_start_tag) + len(analysis_start_tag)
                             analysis_end = content.find(analysis_end_tag)
                             final_start = content.find(final_start_tag) + len(final_start_tag)
 
                             analysis = content[analysis_start:analysis_end].strip()
                             final = content[final_start:].strip()
 
                             message["content"] = final
                             message["thinking"] = analysis
                         except Exception as e:
                             logging.error(f"Failed to parse assistant message with analysis/final tags: {e}")
                             # If parsing fails, leave the content and empty thinking
                             message["thinking"] = ""
             # --- to here ---
 
             process_message_content(messages)
 
             # Moved response_format before `apply_chat_template`
             if body.get("response_format", {}).get("type") == "json_object":
                 if self.tokenizer.chat_template is None:
                     raise ValueError("JSON response format requested, but tokenizer has no chat template. Consider using `--use-default-chat-template`")
                 messages.append({"role": "user", "content": self.tokenizer.json_schema_prompt})

Diff 形式でも貼っておきます。

--- server.py.v0270.original	2025-08-31 00:48:31
+++ server.py	2025-08-31 00:54:51
@@ -861,3 +861,37 @@
             messages = body["messages"]
+
+            # --- Changes from here ---
+            # Modify message based on the `mlx-lm` chat template.
+            for message in messages:
+                if message["role"] == "assistant":
+                    content = message.get("content", "")
+                    if "<|channel|>analysis<|message|>" in content and "<|channel|>final<|message|>" in content:
+                        try:
+                            analysis_start_tag = "<|channel|>analysis<|message|>"
+                            analysis_end_tag = "<|end|>"
+                            final_start_tag = "<|channel|>final<|message|>"
+
+                            analysis_start = content.find(analysis_start_tag) + len(analysis_start_tag)
+                            analysis_end = content.find(analysis_end_tag)
+                            final_start = content.find(final_start_tag) + len(final_start_tag)
+
+                            analysis = content[analysis_start:analysis_end].strip()
+                            final = content[final_start:].strip()
+
+                            message["content"] = final
+                            message["thinking"] = analysis
+                        except Exception as e:
+                            logging.error(f"Failed to parse assistant message with analysis/final tags: {e}")
+                            # If parsing fails, leave the content and empty thinking
+                            message["thinking"] = ""
+            # --- to here ---
+
             process_message_content(messages)
+
+            # Moved response_format before `apply_chat_template`
+            if body.get("response_format", {}).get("type") == "json_object":
+                if self.tokenizer.chat_template is None:
+                    raise ValueError("JSON response format requested, but tokenizer has no chat template. Consider using `--use-default-chat-template`")
+                messages.append({"role": "user", "content": self.tokenizer.json_schema_prompt})
+
             prompt = self.tokenizer.apply_chat_template(

思考部分は丸見えのままですが、これで繰り返しのチャットが可能になります。

感想

とにかくスピードがあるので魅力的ですよね。ドラクエの武道家みたいな。一発一発の攻撃力 (能力) が多少低くても手数で勝負できる感じでしょうか。

120b の Q8 でも 64GB なら、96GB のユニファイドメモリで足りるんですよね。へー、そうかー、と整備済製品ページを眺めてはため息ですよ。

Image by Stable Diffusion (Mochi Diffusion)

単純に「武道家」からイメージして生成。宮平保先生みたいな武術家のイメージだったので、素手の組み手の画像を選択。どうせボク自身が細かいビジョンを持っていないので、最小限の単語の組み合わせが良い結果を生むと理解しつつあります。

Date:
2025年9月1日 1:04:44

Model:
realisticVision-v51VAE_original_768x512_cn

Size:
768 x 512

Include in Image:
fastest martial art fighter

Exclude from Image:

Seed:
199077246

Steps:
20

Guidance Scale:
20.0

Scheduler:
DPM-Solver++

ML Compute Unit:
CPU & GPU

gpt-oss を MLX-LM の API サーバで Dify や Open WebUI からムリヤリ使う方法

(2026/02/02 追記) さらに、もっとスマートな方法であろう、プロキシサーバを書きました。クライアントもサーバも選ばず gpt-oss を動かせるんじゃないかと思います ↓

新しく MXFP4 に対応した MLX/MLX-LM に関する記事を書いたので、そちらをご覧ください。

先日アップした記事では、MLX バージョンの gpt-oss を MLX-LM の API サーバで動かすと Dify や Open WebUI 等では正しく動作しないと書きましたが、server.pyに変更を加えることでチャットだけはできるようになりました。手元の環境では動いていますが、ビビって MLX-LM には PR せずに Issue だけあげてあります (↓ 2コ)。本家が解決したら不要になる情報ですがせっかくなので共有します。

https://github.com/ml-explore/mlx-lm/issues/364 → (2025/08/15 加筆) 修正がコミットされたようなので、近くバージョンアップで修正されそうです

https://github.com/ml-explore/mlx-lm/issues/365 → (2025/08/15 加筆) 「クライアント側で対応すべき内容」と言うことでクローズされました

フォークした repo (↓) には変更済みのserver.pyを置いてあるので、よかったらどうぞ。Dify や Open WebUI 等の API クライアント側の対応を待たずに MLX-LM で gpt-oss が動きます。

https://github.com/tokyohandsome/mlx-lm

前回の記事はこれ:

各種バージョン等

Issue に書いてますが一応。

  • Open Web UI: v0.6.20
  • Dify: 1.7.1
% pip list|grep mlx
mlx 0.28.0
mlx-lm 0.26.3
mlx-metal 0.28.0
%
% python -V
Python 3.12.11

不具合の内容

詳細は上の issue を見てもらいたいのですが、ざっくり以下の内容です:

  1. LLM からのレスポンスが途中で終わってしまう: 何か制御コードみたいなものが含まれているのかと思ったら、ただのカラ文字が原因だった感じです。カラ文字は送らないようにしたら動くようになりました。トークナイザの不具合?
  2. チャットで 2つ目のプロンプトを投げるとサーバでエラーが発生する: 本来 API クライアント側で<|channel|>から<|message|>の思考部分をサーバに送り返さないのが正解だと思います (なので MLX-LM では上記 issue は対応無し)。ただまぁボクの場合、ローカルで動かしているだけなので、サーバで該当部分を捨ててしまうようにしました。

素人目にはそんなに根が深いわけではなさそうなので、かなり近いうちに修正されるんじゃないかと思ってます。(2025/08/15 追記) 1. は新しい MLX-LM のバージョンで修正されそうです。2. はクライアント側での対応が必要です。

手っ取り早く使うには

上記の通り現状 MLX-LM では gpt-oss 個別の対応はされないようで、Open WebUI や Dify などの API クライアント側での MLX 版 gpt-oss 対応を待たなければならないようです。ボクのように MLX の速さやにとりつかれていて API で MLX 版を使いたいという人は、server.pyだけ上書きして使ってみてください。mlxmlx-lmmlx-metalのバージョンは上記と合わせたほうが良いと思います。

仮想環境にクローンして使うならこんな感じです (ポートなどはご自由に)。

git clone https://github.com/tokyohandsome/mlx-lm
pip install -r requirements.txt
python -m mlx_lm.server --host 0.0.0.0 --port 9999 --log-level DEBUG

本家の MLX-LM を入れて、ボクがいじったserver.pyだけを差し替える方法も参考として貼っておきます。pipenvを使ってますがお使いの仮想環境でどうぞ:

mkdir gpt-oss_mlx-lm
cd gpt-oss_mlx-lm
pipenv --python 3.12 # 3.8 以上なら OK
pipenv shell
pip install mlx==0.28.0 mlx-lm==0.26.3

# インストールされたバージョンの確認
pip list|grep mlx

# 元のファイルを .original としてコピーしておく
mv .venv/lib/python3.12/site-packages/mlx_lm/server.py .venv/lib/python3.12/site-packages/mlx_lm/server.py.original
curl https://raw.githubusercontent.com/tokyohandsome/mlx-lm/refs/heads/main/mlx_lm/server.py -o .venv/lib/python3.12/site-packages/mlx_lm/server.py

上の最後の 2行で元のファイルをserver.py.originalとして保存し、変更済みのserver.pyをダウンロードしています。これで準備完了です。

以下コマンドで OpenAI API コンパチの MLX-LM サーバが起動します (例ではポート9999)。

mlx_lm.server --host 0.0.0.0 --port 9999 --log-level DEBUG

Open WebUI 等から gpt-oss に接続し、Terminal に流れるトークン全てが表示され、2回目以降もチャットが続けられれば成功です!

MLX-LM API Server のモデルを Open WebUI や Dify から使う方法は別記事に詳しく書いていますのでどうぞ:

ついでにserver.pyの変更箇所 (diff) を貼っておきます。+の行にあるのが今回追加した部分です:

diff .venv/lib/python3.12/site-packages/mlx_lm/server.py.original .venv/lib/python3.12/site-packages/mlx_lm/server.py
--- .venv/lib/python3.12/site-packages/mlx_lm/server.py.original	2025-08-15 21:05:24
+++ .venv/lib/python3.12/site-packages/mlx_lm/server.py	2025-08-15 21:15:34
@@ -694,2 +694,8 @@
             logging.debug(gen_response.text)
+
+            # --- Added from here ---
+            if not gen_response.text:
+                logging.debug("Skipping empty token.")
+                continue
+            # --- to here ---
 
@@ -837,3 +843,37 @@
             messages = body["messages"]
+
+            # --- Changes from here ---
+            # Modify message based on the `mlx-lm` chat template.
+            for message in messages:
+                if message["role"] == "assistant":
+                    content = message.get("content", "")
+                    if "<|channel|>analysis<|message|>" in content and "<|channel|>final<|message|>" in content:
+                        try:
+                            analysis_start_tag = "<|channel|>analysis<|message|>"
+                            analysis_end_tag = "<|end|>"
+                            final_start_tag = "<|channel|>final<|message|>"
+
+                            analysis_start = content.find(analysis_start_tag) + len(analysis_start_tag)
+                            analysis_end = content.find(analysis_end_tag)
+                            final_start = content.find(final_start_tag) + len(final_start_tag)
+
+                            analysis = content[analysis_start:analysis_end].strip()
+                            final = content[final_start:].strip()
+
+                            message["content"] = final
+                            message["thinking"] = analysis
+                        except Exception as e:
+                            logging.error(f"Failed to parse assistant message with analysis/final tags: {e}")
+                            # If parsing fails, leave the content and empty thinking
+                            message["thinking"] = ""
+            # --- to here ---
+
             process_message_content(messages)
+
+            # Moved response_format before `apply_chat_template`
+            if body.get("response_format", {}).get("type") == "json_object":
+                if self.tokenizer.chat_template is None:
+                    raise ValueError("JSON response format requested, but tokenizer has no chat template. Consider using `--use-default-chat-template`")
+                messages.append({"role": "user", "content": self.tokenizer.json_schema_prompt})
+
             prompt = self.tokenizer.apply_chat_template(

それでもまだ LM Studio が優れているところ

というわけでムリヤリながら Dify や Open WebUI でも MLX 版 gpt-oss でチャットができるようになったわけですが、OpenAI 社が推奨する思考部分をユーザから隠すということができません。そこは正式対応済みの LM Studio が勝っていますね。Dify や Open WebUI も Qwen/Qwen3-32B-MLX-4bit なんか使ってると思考部分は隠せているので、gpt-oss (というか Harmony response format) の正式対応が進んでくれたらいいな、と思っています。

単純に思考部分を完全に見えなくするだけであれば、どうせ今回紹介している方法では乱暴にオリジナルのスクリプトを書き換えて使っているので、server.py<|channel|>から<|message|>までのメッセージをクライアントに返さないように改造してしまっても良いかもしれません。

ところで今回どうやって直したか、とか

せっかくなので LM Studio で gpt-oss を動かして協力してもらいながら解決まで持って行きたかったんですが、テストするときには MLX-LM でも gpt-oss をロードする事になりメモリキャパオーバによるクラッシュの危険性が高いので避けました。で、ChatGPT に相談を始めたものの全然解決に近づいている感じがなく時間ばかりがかかりギブアップ。次に Gemini (2.5 Flash) に相談し始めてからはほぼ最短コースで解決にたどり着いた感じです。この時には質問方法や内容に慣れて、深掘りすべきところにもある程度見当が付いてきたこともあったとは思いますが、Gemini を見直しました。

質問の時には、使っている環境、症状の詳細、関係している可能性が高い Python スクリプト全体 (server.py)、サーバのエラー、クライアント (Dify や Open WebUI) のエラー、等を詳細に伝えることで解決できた感じです。ChatGPT はコードの修正をお願いすると全く違うものが出てきたりして使えなかったです。もしかしたら動いたのかも知れませんがとても pull request には使えないものだったので (そういう意味では gpt-oss もそういう用途では使えないのかな)。Gemini は最小限の追加で、コードを差し込むところの説明含め正確でした。

余談ですが、最近プログラマ不要論みたいなのがありますよね。生成 AI で置き換え可能、とかなんとか。確かに最近は 20B~30B 程度のサイズの LLM でもざっくりとしたプロンプトから一発でブロック崩しゲームを書いてくれたりしますが、狙ったとおりの変更やバグの修正などを上手に行うにはプログラムの知識は必要だと思いますね。

おまけ: gpt-oss-20B と Qwen3 30B A3B の SVG 対決

プロンプト: SVG で UFO が牛をさらっている画像を作ってください

(貼ったのは PNG にしたものです)

まずは inferencerlabs/openai-gpt-oss-20b-MLX-6.5bit

文章で説明するのはズルイ。ま、やっちゃダメとも言わんかったか

次に nightmedia/Qwen3-30B-A3B-Thinking-2507-dwq4-mlx

雰囲気がヨイ!けど人さらってますね

現場からは以上となります!

Image by Stable Diffusion (Mochi Diffusion)

リンゴに絆創膏、というイメージで書いてもらいました。バンドエイドは商標ですが、全くそう見えないものができたのでセーフと自己判断して採用。そろそろリンゴ以外を使った方がいいかもと思いつつも結局こんな感じで、生成 AI ばかり使いすぎて頭がアレになってきた人の特徴でしょうかね。

Date:
2025年8月10日 23:07:43

Model:
realisticVision-v51VAE_original_768x512_cn

Size:
768 x 512

Include in Image:
small band-aid patches on a red apple

Exclude from Image:

Seed:
1709363568

Steps:
21

Guidance Scale:
20.0

Scheduler:
DPM-Solver++

ML Compute Unit:
CPU & GPU

OpenAI gpt-oss はまだ Mac の MLX-LM と Dify や Open WebUI では正しく動かない (対処法あり)

(2026/02/02 追記) MLX-LM は全くいじらずに gpt-oss をスマートに扱うプロキシサーバを書きました。どんな環境でも動きそうな気がします ↓

(2025/09/16 追記) MLX-LM のサーバスクリプトをさらに書き換えて Dify や Open WebUI から Reasoning: High とかもできるようにしました。記事はこちら:

(2025/08/10 追記) MLX-LM のサーバスクリプトを書き換えて Dify や Open WebUI から使えるようにしました。記事はこちら:

ここ 2日程ローカル LLM 界隈で大騒ぎの OpenAI 初のオープンウェイト大規模言語モデル (LLM) gpt-oss。32GB RAM の Mac Studio で MLX 版 20b モデルを試したところ、Dify と Open WebUI では正しく動きませんでした。思考を思考として正しく認識されず全てが垂れ流しされ、MLX-LM から送られてくる制御文字?か何かで出力が途中で止まります。

Hugging Face から落とした同じモデルを使って、LM Studio (最新の 0.3.22 Build 2) では正しく動作しています。Ollama も対応を表明してますがまだ MLX バックエンドは使えないので、2025年 8月 7日現在限定で言えば、Mac で gpt-oss 使う最適解は LM Studio っぽいですね。

(2025/08/08 追記) LM Studio でサーバを動かし OpenAI-API-compatible のモデルプロバイダとして gpt-oss を登録したところ、Dify でも使えました!思考は丸見えですが、それ以外は問題無さそうです。Python でパーフェクトメイズを作るスクリプトは一発でした。その後色々変更を依頼しても毎回正しく動くスクリプトが生成されます。Dify で見えるトークン出力速度は 70 tok/sec を超えています。ヤバい。

使ったモデル

https://huggingface.co/inferencerlabs/openai-gpt-oss-20b-MLX-6.5bit

「6.5bit はほぼ 8bit と同等の性能 (perplexity)」と書かれてあったので、真に受けて選択。同ページにあるように、VRAM が 17 GB 確保できれば動きます。つまり 32GB 以上の RAM を持った Mac ならそのままで動く計算ですが、VRAM 容量を最適化するには別記事 (↓) をどうぞ。

https://blog.peddals.com/fine-tune-vram-size-of-mac-for-llm
↓ が英語ページの場合は、↑ を開いてください

MLX-LM と MLX のバージョンについての注意点

MLX-LM をバックエンド (OpenAI API コンパチブルサーバ) として使う場合は gpt-oss に対応したバージョン 0.26.3 以上が必要になります。インストール済みの環境で使う場合はアップデートしましょう。

pip install -U mlx-lm

MLX は、新規でインストールする場合は問題ないですが、すでに 0.26.5 より古いバージョンが入っていると、そのままアップデートすると動かなくなります。やっちゃった場合は一度削除してから、再度インストールしましょう。ボクはここで若干ハマりました。

pip uninstall mlx
pip uninstall mlx-metal # うっかり 0.26.5 より古いバージョンからアップデートして入ってしまった場合はアンインストール
pip install mlx mlx-metal

情報源はこちらの issue です:

https://github.com/ml-explore/mlx/issues/2402

2025年 8月 7日現在の最新バージョンはこうなります。

% pip list|grep mlx
mlx 0.28.0
mlx-lm 0.26.2
mlx-metal 0.28.0

参考まで、mlxをアップデートしておかしくなった際にサーバを起動しようとして出たエラーを貼っておきます。同じようにlibmlx.dylib' (no such file)が出た場合は上記のmlxのアンインストール&インストールを実行しましょう。

% mlx_lm.server --host 0.0.0.0 --port 8585 --log-level DEBUG
Traceback (most recent call last):
File "/Users/handsome/Documents/Python/mlx-lm/.venv/bin/mlx_lm.server", line 5, in <module>
from mlx_lm.server import main
File "/Users/handsome/Documents/Python/mlx-lm/.venv/lib/python3.12/site-packages/mlx_lm/__init__.py", line 9, in <module>
from .convert import convert
File "/Users/handsome/Documents/Python/mlx-lm/.venv/lib/python3.12/site-packages/mlx_lm/convert.py", line 7, in <module>
import mlx.core as mx
ImportError: dlopen(/Users/handsome/Documents/Python/mlx-lm/.venv/lib/python3.12/site-packages/mlx/core.cpython-312-darwin.so, 0x0002): Library not loaded: @rpath/libmlx.dylib
Referenced from: <8B6A45F7-00BF-3CEA-9AFF-CD76D4BC76F0> /Users/handsome/Documents/Python/mlx-lm/.venv/lib/python3.12/site-packages/mlx/core.cpython-312-darwin.so
Reason: tried: '/Users/handsome/Documents/Python/mlx-lm/.venv/lib/python3.12/site-packages/mlx/lib/libmlx.dylib' (no such file), '/Users/distiller/project/build/temp.macosx-14.0-arm64-cpython-312/mlx.core/libmlx.dylib' (no such file), '/System/Volumes/Preboot/Cryptexes/OS/Users/distiller/project/build/temp.macosx-14.0-arm64-cpython-312/mlx.core/libmlx.dylib' (no such file), '/Users/handsome/Documents/Python/mlx-lm/.venv/lib/python3.12/site-packages/mlx/lib/libmlx.dylib' (no such file), '/Users/distiller/project/build/temp.macosx-14.0-arm64-cpython-312/mlx.core/libmlx.dylib' (no such file), '/System/Volumes/Preboot/Cryptexes/OS/Users/distiller/project/build/temp.macosx-14.0-arm64-cpython-312/mlx.core/libmlx.dylib' (no such file), '/opt/homebrew/lib/libmlx.dylib' (no such file), '/System/Volumes/Preboot/Cryptexes/OS/opt/homebrew/lib/libmlx.dylib' (no such file), '/opt/homebrew/lib/libmlx.dylib' (no such file), '/System/Volumes/Preboot/Cryptexes/OS/opt/homebrew/lib/libmlx.dylib' (no such file)

mlx-lmのバージョンが古い場合は、Dify で gpt-oss を追加する時にERROR - Model type gpt_oss not supportedが出ます。こちらもエラーを貼っておきます。

/Users/handsome/Documents/Python/mlx-lm/.venv/lib/python3.12/site-packages/mlx_lm/server.py:934: UserWarning: mlx_lm.server is not recommended for production as it only implements basic security checks.
warnings.warn(
2025-08-07 18:29:02,619 - INFO - Starting httpd at 0.0.0.0 on port 8585...
2025-08-07 18:29:19,719 - DEBUG - Incoming Request Body: {
"model": "inferencerlabs/openai-gpt-oss-20b-MLX-6.5bit",
"max_tokens": 5,
"messages": [
{
"role": "user",
"content": "ping"
}
]
}
2025-08-07 18:29:19,725 - DEBUG - Starting new HTTPS connection (1): huggingface.co:443
2025-08-07 18:29:19,998 - DEBUG - https://huggingface.co:443 "GET /api/models/inferencerlabs/openai-gpt-oss-20b-MLX-6.5bit/revision/main HTTP/1.1" 200 18528
Fetching 11 files: 100%|███████████████████████████████████████████████████████████████████████████████████████████| 11/11 [00:00<00:00, 223967.69it/s]
2025-08-07 18:29:20,048 - ERROR - Model type gpt_oss not supported.
192.168.111.71 - - [07/Aug/2025 18:29:20] "POST /v1/chat/completions HTTP/1.1" 404 -

その他、MLX-LM を LLM のバックエンドとして使う方法は別記事に書いていますので読んでみてください。Mac で速度と LLM がサポートする大きなコンテキストウィンドウを確保するなら MLX-LM が正解です。

MoE の生成速度のたまらなさ

上記したとおり今日現在 LM Studio でしか正しい動きを確認できていませんが、たまたま最近 Alibaba がリリースした Qwen3 Coder 30B A3B Instruct も gpt-oss 同様 MoE という仕組みで動いています。MoE の細かい内容は他のサイトなどを見てもらうとして、ユーザ目線での最大のメリットは、生成速度の速さです。自分の Mac で動くローカル LLM が、ChatGPT や Gemini 等のクローズド商用モデル同等の速度で文字を生成していく様は、ある意味感動的でもあります。

実は最近、Reasoning/Thinking モデルの精度優先でゆっくりとした生成速度に慣れきった頃に触った Qwen3 Coder 30B A3B Instruct (MoE) の高速生成に感動し何か記事を書こうとしていました。ですが実際に生成されるコードの精度自体がイマイチだったのでどうするか思案していたところ、まさかの OpenAI から gpt-oss がリリースされたのでした。gpt-oss は M2 Max でも 50 token/sec 以上 (!) のスピードでリッチな内容と文字装飾でレスポンスが生成されてくるので、マジたまらないですよ。

まとめ

MLX と Dify や Open WebUI で使えた!とか、使えなかった!という情報が見当たらなかったので、今のところ使えませんでした!という内容でまとめました。 → 使えるようにする方法は、別記事にまとめてます:

リーダーボードなどで無視されがちな GLM-4-32B もプロンプトをしっかり書けば良い結果が得られそうだなぁ、と思っていたところに OpenAI さんがオープンウェイトを出してきたので、他の LLM の細かい話は色々うっちゃって、当面は gpt-oss をいじるのが正解な気がしています。

ボクはいくつかの過去記事で、32GB RAM (ユニファイドメモリ) の Mac でローカル LLM を使うのは苦労と工夫が必要だよ!と書いてきたのですが、なんだかんだと 20B~30B パラメータ程の優秀な LLM が定期的にリリースされているので、とりあえず 32GB RAM の Mac を買えばそれなりに充実したローカル LLM ライフをエンジョイできる!そんな世の中になっていると言えそうです。イェイ!

Image by Stable Diffusion (Mochi Diffusion)

まだ gpt-oss の性能がこれまでのオープンウェイト LLM より大幅に勝っているかどうかわからないのでミスリーディングなあおり画像かもですね。とりあえずみんなかわいかったので採用。

Date:
2025年8月8日 1:05:22

Model:
realisticVision-v51VAE_original_768x512_cn

Size:
768 x 512

Include in Image:
major league baseball player with kids

Exclude from Image:

Seed:
850711837

Steps:
22

Guidance Scale:
20.0

Scheduler:
DPM-Solver++

ML Compute Unit:
CPU & GPU

アップルシリコン Mac でマイクラをネイティブ動作 + 路地裏MOD + シェーダ = 昭和最高

久しぶりにマイクラ熱の高まった娘でしたが、路地裏 mod とシェーダを入れるとクラッシュして起動できないと悲しんでいたので手助けしました。

思いのほか手間がかかったのでやり方を残しておきます。

環境

とりあえず使ったものやバージョンをご紹介。ある程度マイクラをやっている人は多分、ここと下のリンク集さえあればどうにかできるかと。

  • Mac: M1 Mac mini + 16GB RAM
  • macOS: Sequoia 15.5
  • Minecraft ライセンス: Java 版
  • アカウント: マイクロソフト
  • Minecraft 実行環境 (ランチャー): Prism Launcher 9.4
  • Minecraft のバージョン: 1.12.2
  • LWJGL 2: 2.9.4-nightly-20150209
  • Forge: 14.23.5.2859
  • Java: azul_zulu_jre8.0.332
  • (Mod) CTM: MC1.12.2-1.0.2.31
  • (Mod) OptiFine: 1.12.2_HD_U_G5_MOD
  • (Mod) Rojiura Mod: 1.12.2_0.1.4.a
  • リソースパック: RojiuraMod_ResourcePack_0.0.6b
  • シェーダーパック: BLS_v10.0

リンク

ポイント

M1 Mac に移行してからは MultiMC を使っていたのですが、1.12.2 で Forge と OptiFine と CTM の全てを有効にするとクラッシュするという状況から抜け出せず、別のランチャー Prism Launcher を使ったらうまくいきました。

路地裏MOD が動くマイクラバージョン 1.12.2 は 2017年 9月にリリースされたものらしく、Java や他の MOD 等も新しいバージョンだと動かないという事が多いです。上にあるバージョンは動作確認済みなので、極力同じバージョンのものを入手するのが成功の鍵です。

マイクラの購入やアカウントの作成など基本的なところには触れません。

途中で一度 Mac の管理者のパスワードが要求される場面があります。自分で管理者権限を持っていない人は、管理者がいるときに進めましょう。

あと、ボク自身はすぐに 3D 酔いしてしまうのでほとんど遊んでいません。なので、路地裏MOD の細かい使い方は他を探してください。また、この記事に貼っているマイクラ画像はほぼ全て娘から提供してもらったものです。

手順

Prism Launcher のインストール

Prism Launcher をダウンロードし、アプリケーションフォルダにインストールします。初回起動時には言語の選択とログインが必要になるので、それらも行っておきます。

OptiFine MOD の抽出

OptiFine 1.12.2 HD U G5 をダウンロードし、ダブルクリックして開きます。プライバシーに関するアラートが出ると思うので、以下の要領で対応します。

まずは「完了」をクリックして閉じます。次にアップルアイコンからシステム設定を開きます。

プライバシーとセキュリティを開き下にスクロールすると、アプリケーションの実行許可で OptiFine がブロックされていることが確認できるので、「このまま開く」をクリックします。

別のアラートが表示されるので、もう一度「このまま開く」をクリック。

すると管理者のユーザ名とパスワードが要求されるので、入力して OK。

うまくいけば OptiFine のインストーラが立ち上がります。真ん中の Extract をクリックしましょう。

ダウンロードなど適当なフォルダを選んで、「保存」します。これでマイクラで使用できる OptiFine の MOD ファイルが抽出されます。

成功したら OK で終了します。ダウンロードした OptiFine_1.12.2_HD_U_G5.jar も捨てて構いません。

路地裏MOD 関連の入手

Safari だとうまくダウンロードできないので、路地裏MOD のウェブサイトを Chrome 等のウェブブラウザで開きます。

テスト版を入手する → 利用規約に同意します。をチェック → ダウンロードページへ

(1) 路地裏MOD本体と、(2) 路地裏MOD専用リソースパックをダウンロードします (クリックしてからダウンロードされるまで少し時間がかかります)。

ダウンロードが完了しないときは、アドレスバー右のダウンロードアイコンをクリックして「保存」しよう

(4) ConnectedTexturesMod を開いて「CTMModをダウンロードしに行く」をクリックすると、英語の ConnectedTexturesMod のページが開きます。右手の「↓ Download」をクリックしましょう。

All Game Versions から「1.12.2」、All Mod Loaders から「Forge」を選び、正しいバージョンが選択されていることを確認したら「Download File」をクリックします。少し待つとファイルがダウンロードされます。

Prism Launcher の設定

さあやっとマイクラ本体の設定に取りかかれます。まずは Prism Launcher を起動し、左上の「起動構成を追加(E)」をクリックします。

適当に名前を付け、カスタムバージョンは 1.12.2、右下の Modローダーは Forge をクリックし、バージョンは 14.23.5.2859 にして OK しましょう。

今作ったワールドが選択されていることを確認し、「編集(E)」をクリックします。

Mod のインストール

コンソールウィンドウで Mod をクリックし、ダウンロードしておいた CTMOptiFinerojiuramod それぞれをドラッグアンドドロップします。

リソースパックのインストール

次にリソースパックを開き、RojiuraMod_ResourcePack をドラッグアンドドロップします。

シェーダーパックのインストール

シェーダーパックのダウンロードとインストールは Prism Launcher 内で行えます。シェーダーパックをクリックし、右手の「シェーダーをダウンロード」をクリックします。

BSL Shaders を選択し、「ダウンロードするシェーダーパックとして選択」をクリックします。すると「確認」ボタンが押せるようになるので、クリックします。

なぜかここは手順が多いですが、開いた画面で「OK」をクリックしてダウンロードします。

ダウンロードが終わるとインストール済みとして表示されます。

Java の設定

コンソールウィンドウの設定をクリックし、Java タブの「Java の設定」にチェックを入れます。

すでに JRE 8 をインストール済みであれば「自動検出」をクリックします。バージョン 1.8.0 でアーキテクチャが aarch64 のものを選択して「OK」します。

もし読み込まれない場合は、「Javaのダウンロード」をクリックします。

Azul ZuluJava 8、バージョン 8.0.332 を選択してダウンロードします (試してませんが、8 であれば他のバージョンなどでも大丈夫かもしれません)。

ダウンロード後、「自動検出」で 1.8.0.332 を選択すれば準備完了です。

起動と使い方

コンソールウィンドウの一番下にある「起動」ボタンをクリックし、いつもの画面が表示されれば勝ち確定です。

とりあえず Options > Language… から「日本語(日本)」を選びましょうか。

次にリソースパックの利用可能なリソースパックにある RojiuraMod_ResourcePack のアイコンをクリックして使用中に移動し、完了します。

設定に戻ったら「シェーダーの詳細設定…」をクリックし、BSL_V10.0.zip をクリックします。読み込まれるまで数秒かかるので心穏やかに待ちます。選択されたら、とりあえず完了を 3回クリックしてタイトル画面に戻ります。

あとは、シングルプレイ、ワールド新規作成、ゲームモード: クリエイティブ、と進んで「ワールド新規作成」しましょう。

E キーで持ち物を表示させ、上の [ < ] か [ > ] で路地裏MOD のアイテムを持つことができます。

以上です。お疲れ様でした。

最後に

ビデオやシェーダーの設定は色々といじったほうが良いと思います。建築の時はフレームレートを稼ぐために最小限の負荷にし、撮影するときだけシェーダーを有効にするとか。あと、BSL って自分影が覆い被さっている感じなので、もしかしたら他のシェーダーを選んだほうが良いかもしれません。

では最後に、昭和好きな娘の作品をいくつか貼って、路地裏MOD の作者さんに感謝しつつ終わりとします。

© Peddals.com