Distributed systems asoslaridan LLM, RAG va agentic AI'gacha — production darajasidagi backend tizimlarni qurish uchun to'liq qo'llanma. Real loyihalardan olingan kod namunalari, interaktiv diagrammalar va chuqur tushuntirishlar.
Part I
distributed systems foundations
Barcha zamonaviy backend architecture shu bo'limdan boshlanadi: tenant, SaaS, auth, API design, caching. Bular — keyingi hamma narsaning poydevori. AI tizim ham shu qoidalar bilan yashaydi.
01
core concepts & terminology
1.1
tenant, saas, multi-tenancy — to'liq manzara
Tenant, SaaS, multi-tenancy, B2B/B2C — bular qanday bog'liq va qanday ishlaydi?
javob
Oddiy tushuntirish
Tenant = "ijarachi". Bino egasi siz — ko'p kompaniyalarga ijaraga berasiz. Har kompaniya o'z ofisini (data'sini) ko'radi, boshqalarnikini emas.
SaaS = Software as a Service. Brauzerda ochib ishlatiladigan ilova (Gmail, Slack, Notion). Mijoz hech narsa o'rnatmaydi.
Multi-tenant SaaS = bitta ilova, ming mijoz. Har biri alohida workspace.
SaaS arxitekturasining 4 asosiy komponenti
Multi-tenancy izolyatsiya — 3 ta asosiy strategiya
Strategiya 1 — Database-per-tenant
Har tenant uchun alohida DB instance. Eng yuqori izolyatsiya, lekin eng qimmat operatsion jihatdan.
db_router.py
from sqlalchemy.ext.asyncio import create_async_engine
class TenantDBRouter:
"""Har tenant uchun alohida connection"""
def __init__(self):
self._engines: dict[str, AsyncEngine] = {}
async def get_engine(self, tenant_id: str) -> AsyncEngine:
if tenant_id not in self._engines:
# Metadata DB'dan tenant connection string olish
conn_str = await self._resolve_tenant_db(tenant_id)
self._engines[tenant_id] = create_async_engine(
conn_str, pool_size=5, max_overflow=10
)
return self._engines[tenant_id]
async def _resolve_tenant_db(self, tenant_id: str) -> str:
# metadata DB'dan lookup
tenant = await metadata_db.fetch_one(
"SELECT db_host, db_name FROM tenants WHERE id = $1", tenant_id
)
return f"postgresql+asyncpg://{tenant.db_host}/{tenant.db_name}"
Qachon ishlating: regulyatsiya talablari (HIPAA, bank), 50 tadan kam katta enterprise mijoz, har mijoz uchun alohida backup/restore kerak.
Strategiya 2 — Schema-per-tenant
Bitta PostgreSQL instance, har tenant uchun alohida schema (jadvallar guruhi).
schema_strategy.sql
-- Har tenant uchun schema
CREATE SCHEMA tenant_acme;
CREATE SCHEMA tenant_globex;
-- Har schema'da jadvallar
CREATE TABLE tenant_acme.orders (id UUID PRIMARY KEY, amount DECIMAL);
CREATE TABLE tenant_globex.orders (id UUID PRIMARY KEY, amount DECIMAL);
-- Query vaqtida search_path
SET search_path TO tenant_acme;
SELECT * FROM orders; -- avtomatik tenant_acme.orders
Afzallik: bitta DB — bitta backup. Kamchilik: 10,000+ schema'da PostgreSQL sekinlashadi, migrations har schema uchun alohida yugurtirish kerak.
Strategiya 3 — Shared schema with tenant_id
Bitta DB, bitta schema, har jadvalda tenant_id ustun. Eng ko'p ishlatiladigan yondashuv.
shared_schema.sql
CREATE TABLE orders (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL,
user_id UUID NOT NULL,
amount DECIMAL(10,2) NOT NULL,
status TEXT NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW(),
-- MUHIM: har composite index tenant_id dan boshlanadi
CONSTRAINT fk_tenant FOREIGN KEY (tenant_id) REFERENCES tenants(id)
);
-- tenant_id doimo birinchi!
CREATE INDEX idx_orders_tenant_status
ON orders(tenant_id, status)
WHERE status != 'archived';
CREATE INDEX idx_orders_tenant_user_created
ON orders(tenant_id, user_id, created_at DESC);
-- RLS — oxirgi himoya qatlami
ALTER TABLE orders ENABLE ROW LEVEL SECURITY;
CREATE POLICY tenant_isolation ON orders
USING (tenant_id = current_setting('app.tenant_id')::uuid);
Tanlash matritsasi — real dunyo
Mezon
DB-per-tenant
Schema-per-tenant
Shared schema
Izolyatsiya
Eng yuqori
Yuqori
O'rtacha (RLS bilan)
Cost per tenant
$$$
$$
$
Scale limit
~100 tenant
~1000 schema
Millions
Noisy neighbor
Yo'q
Kam
Yuqori (mitigation kerak)
Migration
Har DB
Har schema
Bitta run
Cross-tenant analytics
Qiyin
O'rtacha
Oson
Use case
Healthcare, Bank
B2B SMB
Consumer SaaS
✦
Trend — hybrid approach
Modern SaaS tendentsiyasi: shared schema by default, lekin premium enterprise mijozlarga dedicated DB. Bu "silver tier" arxitekturasi — standart mijozlar uchun arzon, enterprise uchun to'liq izolyatsiya. Snowflake, Databricks, Notion shunday yo'l tutadi.
1.2
performance metrics — rps, latency, percentiles
RPS, p50, p95, p99, SLA, SLO — bularni to'g'ri o'lchash va interpretatsiya qilish?
javob
Nima uchun bu metrikalar muhim?
Backend muhandis sifatida, tizimingiz qanchalik yaxshi ishlayotganini o'lchash uchun uchta asosiy savol bor: Nechta so'rov kelmoqda? (throughput), Qancha vaqt ketmoqda? (latency), Qanchasi xato bo'lmoqda? (error rate). Bu metrikalar yo'q bo'lsa, muammoni ko'z yumib topishga harakat qilasiz — ishlashi mumkin, lekin ko'r bo'lib qolasiz.
RPS va throughput — hajm o'lchami
RPS (Requests Per Second) — sekundiga qancha so'rov keladi. TPS (Transactions Per Second) — DB transaction soni. Scale haqida gapirganda birinchi savol: "qancha RPS kerak?"
Real dunyo misollari (peak RPS):
─────────────────────────────────
Small startup MVP ~10 RPS
Growing B2B SaaS ~100-500 RPS
Popular consumer app ~1k-10k RPS
Twitter/X peak ~500k RPS
Google search ~100k RPS average, millions peak
Latency va percentile'lar — tajriba o'lchami
Latency — bitta so'rovga qancha vaqt sarflanadi. Lekin average (o'rtacha) deyarli hech nima aytmaydi, percentile muhim.
Percentile — bu nima?
100 ta so'rovni tezlik bo'yicha tartiblab qo'ying. p99 = 99-chi o'rinda turgan so'rov vaqti. Ya'ni so'rovlarning 99% undan tez, 1% sekinroq.
100 ta so'rov, sortlangan (ms):
[10, 12, 15, 18, 20, ..., 50, ..., 200, 300, 800, 3000]
↑ ↑ ↑
p50 p95 p99
so'rovlarning yarmi so'rovlar eng yomon
50ms dan tez 95% < 200ms 1% — 3000ms
▲
Average — yolg'onchi metric
100 ta so'rov: 99 tasi 50ms, 1 tasi 5000ms bo'lsa — average = 99.5ms (normal ko'rinadi!), lekin p99 = 5000ms (real hayotda kimdir 5 sekund kutmoqda). Har doim percentile bilan ishlang.
# p99 oxirgi 5 daqiqa ichida
histogram_quantile(0.99,
sum(rate(http_request_duration_seconds_bucket[5m])) by (le)
)
# Per-endpoint p95
histogram_quantile(0.95,
sum(rate(http_request_duration_seconds_bucket[5m])) by (le, endpoint)
)
# RPS
sum(rate(http_request_duration_seconds_count[1m]))
SLA, SLO, SLI — reliability targeting
Bu uchlik — reliability tili
Ko'pchilik "uptime 99.9%" deydi va shunda qoladi. Ammo professional tizimda uchta alohida tushuncha bor: SLI — siz haqiqatda o'lchaydigan narsa (masalan, "so'rovlarning 99.2% 500ms'dan tez bo'ldi"). SLO — sizning ichki maqsadingiz ("99% qo'shimcha 400ms"). SLA — mijoz bilan rasmiy shartnoma ("99.5% yoki pul qaytariladi"). SLO har doim SLA'dan qattiqroq bo'ladi — shunda SLA'ni buzmasdan oldin ogohlantirishasiz.
Error Budget — SRE sir qurolligi
Agar SLO = 99.9% uptime (oyda 30 kun) bo'lsa, sizda 43 daqiqa 49 sekund downtime budjeti bor har oy. Bu — "error budget". Uni aqlli sarflash kerak.
SLO %
Year downtime
Month downtime
Week downtime
99.9%
8h 45m
43m 49s
10m 4s
99.95%
4h 22m
21m 54s
5m 2s
99.99% (4 nines)
52m 35s
4m 22s
1m 0s
99.999% (5 nines)
5m 15s
26.3s
6s
ⓘ
5 nines — juda qimmat
Har qo'shimcha "nine" uchun cost ~10x oshadi. Bank core system 99.999% kerak bo'lishi mumkin, lekin consumer SaaS uchun 99.9% yetarli. Aqlli biznes qarori — "error budget'ni feature'larga sarflash" (tez yangi feature chiqarish, kam stabillik) yoki stabillikka sarflash.
Bitta tenant butun tizimni sekinlashtirayapti — qanday yechimlar bor?
javob
Muammo
100 ta tenant bitta PostgreSQL'ga ulangan. Mijoz #47 har soatda 50 million qator export qiladi. Bu vaqtda DB CPU 100% ga chiqadi, qolgan 99 tenant ham sekinlashadi. Hamma shikoyat qiladi — lekin sabab faqat bitta tenant.
5 qatlamli mitigation strategy
Nima uchun 5 qatlam?
Har qatlam muammoning turli jihatini hal qiladi. Ular bir-birini to'ldiradi — faqat bittasini qo'llash yetarli emas. L1 request sonini cheklaydi, L2 database ulanishlarini, L3 og'ir vazifalarni ajratadi, L4 fizik izolyatsiya beradi, L5 eng katta tenantlarga to'liq egalik beradi. Request kelganida yuqoridan pastga har qatlamdan o'tadi — agar limit oshsa, o'sha qatlamda to'xtatiladi.
L1 — Token bucket rate limiter (Redis)
Token bucket — kova analogiyasi
Tasavvur qiling: har tenant uchun alohida kova bor. Kova vaqt o'tishi bilan avtomatik to'ladi (masalan, sekundiga 10 ta token). Har so'rov 1 token sarflaydi. Kova bo'sh bo'lsa — so'rov rad etiladi (429). Burst ruxsati bor — kova maksimum 20 token sig'diradi, ya'ni bir onda 20 ta so'rov ketishi mumkin, keyin kova to'lguncha kutish kerak.
Nima uchun Redis Lua script? Chunki "tokenni tekshir va o'chir" operatsiyasi atomik bo'lishi kerak — aks holda race condition: ikki parallel so'rov bir vaqtda "10 token bor" ko'rib ikkisi ham o'tib ketadi.
Bitta tenant uchun sekundiga X so'rov, burst Y gacha ruxsat. Lua script bilan atomic operation:
token_bucket.py
TOKEN_BUCKET_LUA = """
local key = KEYS[1]
local rate = tonumber(ARGV[1]) -- tokens per second
local capacity = tonumber(ARGV[2]) -- bucket size
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])
local bucket = redis.call('HMGET', key, 'tokens', 'last')
local tokens = tonumber(bucket[1]) or capacity
local last = tonumber(bucket[2]) or now
-- tokens to'ldirish
local elapsed = math.max(0, now - last)
tokens = math.min(capacity, tokens + elapsed * rate)
if tokens < requested then
return {0, tokens} -- rad etish
end
tokens = tokens - requested
redis.call('HMSET', key, 'tokens', tokens, 'last', now)
redis.call('EXPIRE', key, 3600)
return {1, tokens}
"""
class TenantRateLimiter:
def __init__(self, redis_client):
self.redis = redis_client
self.script = redis_client.register_script(TOKEN_BUCKET_LUA)
async def check(self, tenant_id: str, cost: int = 1) -> tuple[bool, int]:
# Tenant tier'iga qarab limit
limits = TIER_LIMITS[tenant_tier(tenant_id)]
allowed, remaining = await self.script(
keys=[f"rl:{tenant_id}"],
args=[limits.rate, limits.burst, time.time(), cost]
)
return bool(allowed), int(remaining)
TIER_LIMITS = {
"free": Limits(rate=10, burst=20), # 10 req/sec
"pro": Limits(rate=100, burst=200),
"enterprise": Limits(rate=1000, burst=2000),
}
L3 — Heavy workload queue isolation (Celery)
Tezkor va og'ir vazifalarni alohida worker pool'larda yugurtirish — eng ta'sirli texnika.
Big tenant export bilan fast queue'ni zaharlab qo'ymaydi. Natijada: user action'lar (login, order) har doim tez bajariladi, og'ir ishlar sekinroq (lekin bu kutiluvchi).
02
auth & access control
2.1
authentication — session, jwt, oauth2, passkeys
2026-yilda qanday auth texnikalari mavjud va qaysisi qachon ishlatiladi?
javob
Auth landscape — qaysi texnikani tanlash
Session-based auth (klassik, hali ham yashaydi)
Session auth qanday ishlaydi?
Foydalanuvchi login qilganda server uning ma'lumotlarini server tomonida (Redis'da) saqlaydi va brauzerga faqat bir ID raqam beradi — xuddi mehmonxona karta raqami kabi. Keyingi har so'rovda browser shu ID'ni yuboradi, server Redis'ga qarab "bu kim?" deb tekshiradi.
Afzalligi: Logout juda oddiy — Redis'dan o'chirasiz. Hamma sessilarni o'chirish mumkin (masalan, "barcha qurilmalardan chiq"). Kamchiligi: Har so'rovda Redis'ga murojaat kerak, horizontal scaling'da sticky sessions muammosi yoki centralized Redis kerak.
session_auth.py
from fastapi import Response, Cookie, HTTPException
import secrets
SESSION_TTL = 3600 * 24 * 7 # 7 kun
async def login(email: str, password: str, response: Response):
user = await verify_credentials(email, password)
if not user:
raise HTTPException(401, "Invalid credentials")
# Session ID yaratish
sid = secrets.token_urlsafe(32)
await redis.setex(
f"session:{sid}",
SESSION_TTL,
json.dumps({
"user_id": str(user.id),
"tenant_id": str(user.tenant_id),
"created_at": time.time(),
"ip": request.client.host,
"user_agent": request.headers.get("user-agent", "")
})
)
# Secure cookie
response.set_cookie(
key="sid",
value=sid,
max_age=SESSION_TTL,
httponly=True, # JS'dan o'qib bo'lmaydi (XSS himoya)
secure=True, # faqat HTTPS
samesite="lax", # CSRF himoya
)
return {"status": "logged_in"}
async def get_current_user(sid: str = Cookie(None)):
if not sid:
raise HTTPException(401)
data = await redis.get(f"session:{sid}")
if not data:
raise HTTPException(401)
return User(**json.loads(data))
async def logout(sid: str = Cookie(None)):
await redis.delete(f"session:{sid}")
JWT — stateless, scalable
JWT — paport analogiyasi
Session-based auth — bu mehmonxona kartasi: har safar ishlatilganda hotel tizimiga murojaat qilinadi "bu karta haqiqiymi?". JWT esa — davlat muhri bosilgan paport: chegara nazoratchi har safar davlatning bazasiga murojaat qilmaydi, faqat muhrni tekshiradi. Agar muhur to'g'ri bo'lsa — o'tkaziladi.
Natijada: 10 ta microservice bo'lsa, JWT ularning har birida mustaqil verify qilinadi — markaziy auth serverga murojaat yo'q. Bu katta traffic'da katta afzallik.
JWT ning katta muammosi — revoke qilish
Paportni yo'qotsangiz, u keyingi 10 yil davomida amal qiladi. JWT ham xuddi shunday — token yaratilgandan keyin uni "bekor qilish" yo'li yo'q (blacklist'siz). Shuning uchun production'da: access token muddati qisqa (15 min) + refresh token (30 kun, Redis'da saqlanadi). Refresh tokenni revoke qilish oson — Redis'dan o'chirasiz. Access token muddati o'tguncha (15 min) ishlayveradi — bu acceptable kompromiss.
Tasavvur qiling: siz Spotify'ga kirasiz, u Google kontaktlaringizni ko'rmoqchi. Eng sodda yechim — Spotify'ga Google parolingizni bering. Lekin bu dahshatli xavfsizlik xatosi: Spotify sizning Gmail, Drive, YouTube — hammasiga kirish huquqiga ega bo'lib qoladi. Agar Spotify hack qilinsa, Google akkauntingiz ham ketadi.
OAuth buni hal qiladi: siz Google'ga to'g'ridan-to'g'ri kirasiz, Google esa Spotify'ga cheklangan ruxsat (token) beradi — faqat so'ralgan permissions uchun. Spotify sizning parolingizni hech qachon ko'rmaydi.
Authorization Code Flow — server-side web app uchun eng xavfsiz flow. 8 qadam, ikki xil kanal ishlatiladi:
Nega AUTH_CODE → Token exchange kerak? (5→6→7)
Step 5'da Google AUTH_CODE beradi — bir martalik, 60 sekundlik vaqtinchalik kalit. Nega to'g'ridan-to'g'ri token bermaydi? Chunki step 5 browser orqali o'tadi — URL'da ko'rinadi, browser history'da qoladi. Agar hacker URL'ni ushlasa, faqat bekor token oladi. Ammo step 6'da sizning serveringiz client_secret bilan AUTH_CODE'ni almashtiradi — bu kanal faqat sizning server va Google o'rtasida, brauzer ko'rmaydi.
oauth_callback.py
from authlib.integrations.starlette_client import OAuth
oauth = OAuth()
oauth.register(
name='google',
client_id=os.getenv('GOOGLE_CLIENT_ID'),
client_secret=os.getenv('GOOGLE_CLIENT_SECRET'),
server_metadata_url='https://accounts.google.com/.well-known/openid-configuration',
client_kwargs={'scope': 'openid email profile'}
)
@app.get('/auth/google')
async def google_login(request: Request):
redirect_uri = request.url_for('google_callback')
return await oauth.google.authorize_redirect(request, redirect_uri)
@app.get('/auth/google/callback')
async def google_callback(request: Request):
token = await oauth.google.authorize_access_token(request)
user_info = token.get('userinfo') or await oauth.google.parse_id_token(request, token)
# Find or create user
user = await find_or_create_user(
email=user_info['email'],
name=user_info.get('name'),
google_sub=user_info['sub'],
)
# Issue your own session/tokens
return issue_tokens(user)
Passkeys — 2026 yangi standart
Password'larni o'rnini bosuvchi yangi standart. WebAuthn + FIDO2 asosida. User'ning device'ida maxfiy kalit (Touch ID, Face ID, Windows Hello bilan himoyalangan), server'da public key.
✓
Nima uchun passkeys?
Apple, Google, Microsoft 2024-dan boshlab default qilishdi. 2026-da hamma major SaaS'da mavjud. Phishing-resistant (domain'ga bog'langan), password yo'q (leak qila olmaysiz), user uchun oddiy (Face ID).
RBAC, ABAC, ReBAC — qaysi birini qachon tanlash va qanday joriy qilish?
javob
RBAC (Role-Based Access Control)
Eng klassik yondashuv. Role → permissions → users.
rbac_schema.sql
CREATE TABLE roles (
id UUID PRIMARY KEY,
tenant_id UUID NOT NULL,
name TEXT NOT NULL,
UNIQUE (tenant_id, name)
);
CREATE TABLE permissions (
id UUID PRIMARY KEY,
resource TEXT NOT NULL, -- 'orders', 'reports'
action TEXT NOT NULL, -- 'read', 'write', 'delete'
UNIQUE (resource, action)
);
CREATE TABLE role_permissions (
role_id UUID REFERENCES roles(id) ON DELETE CASCADE,
permission_id UUID REFERENCES permissions(id) ON DELETE CASCADE,
PRIMARY KEY (role_id, permission_id)
);
CREATE TABLE user_roles (
user_id UUID REFERENCES users(id) ON DELETE CASCADE,
role_id UUID REFERENCES roles(id) ON DELETE CASCADE,
PRIMARY KEY (user_id, role_id)
);
ABAC (Attribute-Based)
Atribut'lar asosida qaror: user atributlari + resource atributlari + context. Murakkab qoidalar uchun.
abac.py
def can_access_report(user, report, context):
"""Report ko'rish qoidalari — murakkab"""
# Admin hamma narsani
if "admin" in user.roles:
return True
# Own department
if user.department_id == report.department_id:
return True
# Manager — qo'l ostidagilar report'lari
if user.role == "manager" and report.author_id in user.subordinate_ids:
return True
# Finance team — faqat ish vaqti
if "finance" in user.roles and report.type == "financial":
now = context["now"]
if 9 <= now.hour < 18 and now.weekday() < 5:
return True
# Region restriction
if report.classification == "confidential":
if user.region != report.region:
return False
return False
ReBAC (Relationship-Based) — Google Zanzibar style
Google Drive, GitHub, Notion — bu usulni ishlatadi. "User X resource Y ga relation R ga ega" deb ifodalanadi.
Misol relationship'lar:
user:alice → owner → document:report_2026
user:bob → editor → document:report_2026
group:finance → viewer → document:report_2026
user:carol → member → group:finance
Query: "Carol report_2026 ni ko'ra oladimi?"
Graph: carol → member → finance → viewer → report_2026 ✓
Kod emas, policy as code. Authorization qoidalari deklarativ tilda (Rego):
policy.rego
package app.authz
default allow = false
# Admin hamma narsa
allow {
input.user.roles[_] == "admin"
}
# Owner o'zining resursi
allow {
input.resource.owner_id == input.user.id
}
# Reader faqat publish'lar
allow {
input.action == "read"
input.resource.status == "published"
input.user.roles[_] == "reader"
}
# Workday restriction finance uchun
allow {
input.user.department == "finance"
input.resource.type == "financial_report"
workday
time.clock(time.now_ns())[0] >= 9
time.clock(time.now_ns())[0] < 18
}
workday {
day := time.weekday(time.now_ns())
day != "Saturday"
day != "Sunday"
}
Qachon nimani tanlash?
RBAC tanlang agar
Sodda ierarxiya: admin/manager/user
Permission'lar kam o'zgaradi
Tezroq implement qilish kerak
Startup yoki oddiy SaaS
RBAC yetmaydi agar
Owner/editor/viewer per-resource
Vaqt/joyga bog'liq qoidalar
Cross-tenant sharing (Google Drive)
Compliance (SOC2, GDPR) talab qiladi
✦
Zero Trust — default standard
"Trust nothing, verify everything". Har request'da: authenticate → authorize → audit. Internal service'lar ham mTLS bilan. SPIFFE/SPIRE, Istio service mesh yordamida. Keyingi bo'limlarda ko'ramiz.
03
api design — all protocols
3.1
rest api — resource design, versioning, hateoas
Zamonaviy REST API qanday tuzilishi kerak? Versioning, pagination, error handling.
javob
Resource modeling — URL strukturasi
Good REST URLs (nouns, hierarchical):
✓ GET /api/v1/orders
✓ GET /api/v1/orders/abc-123
✓ POST /api/v1/orders
✓ PATCH /api/v1/orders/abc-123
✓ DELETE /api/v1/orders/abc-123
✓ GET /api/v1/users/u-456/orders (sub-resource)
Bad URLs (verbs, actions in URL):
✗ GET /api/getAllOrders
✗ POST /api/createOrder
✗ POST /api/orders/abc-123/update
✗ GET /api/order?id=abc-123
Actions that don't fit CRUD:
→ POST /api/v1/orders/abc-123/cancel (resource action)
→ POST /api/v1/orders/abc-123:refund (Google AIP style)
Versioning — 3 ta yondashuv
Strategy
Example
Pros
Cons
URL path
/api/v2/orders
Aniq, cache-friendly
Breaking change visible
Header
Accept: application/vnd.app.v2+json
Clean URL
Caching qiyin
Query param
?version=2
Oson
Anti-pattern
Tavsiya: URL path versioning — 95% holatlarda eng oddiy va eng aniq.
Pagination — 3 ta texnika
1. Offset pagination (eng oddiy, lekin xavfli)
offset.py
@app.get("/orders")
async def list_orders(page: int = 1, per_page: int = 20):
offset = (page - 1) * per_page
orders = await db.fetch_all(
"SELECT * FROM orders WHERE tenant_id = $1 "
"ORDER BY created_at DESC LIMIT $2 OFFSET $3",
tenant_id, per_page, offset
)
total = await db.fetch_val("SELECT COUNT(*) FROM orders WHERE tenant_id = $1", tenant_id)
return {
"data": orders,
"meta": {"page": page, "per_page": per_page, "total": total}
}
Muammo: OFFSET 100000 — PostgreSQL 100k qatorni o'qib, tashlab yuboradi. Sekin. Shuningdek, data o'zgarsa, user ba'zi elementlarni ikki marta ko'rishi mumkin.
2. Cursor pagination (production standard)
cursor.py
import base64
def encode_cursor(value: dict) -> str:
return base64.urlsafe_b64encode(json.dumps(value).encode()).decode()
def decode_cursor(cursor: str) -> dict:
return json.loads(base64.urlsafe_b64decode(cursor.encode()))
@app.get("/orders")
async def list_orders(cursor: str | None = None, limit: int = 20):
limit = min(limit, 100)
if cursor:
c = decode_cursor(cursor)
rows = await db.fetch_all("""
SELECT * FROM orders
WHERE tenant_id = $1 AND (created_at, id) < ($2, $3)
ORDER BY created_at DESC, id DESC
LIMIT $4
""", tenant_id, c["created_at"], c["id"], limit + 1)
else:
rows = await db.fetch_all("""
SELECT * FROM orders WHERE tenant_id = $1
ORDER BY created_at DESC, id DESC LIMIT $2
""", tenant_id, limit + 1)
has_more = len(rows) > limit
rows = rows[:limit]
next_cursor = None
if has_more and rows:
last = rows[-1]
next_cursor = encode_cursor({"created_at": last.created_at.isoformat(), "id": str(last.id)})
return {"data": rows, "next_cursor": next_cursor, "has_more": has_more}
✓
Cursor pagination foydalari
O(log n) har query · Data o'zgarishga chidamli · Infinite scroll uchun ideal · Twitter, Stripe, Shopify — hammasi cursor. Composite cursor (created_at + id) — tie-break uchun.
{
"type": "https://api.myapp.com/errors/validation",
"title": "Validation failed",
"status": 422,
"detail": "amount must be positive",
"instance": "https://api.myapp.com/orders",
"field": "amount",
"received": -100
}
HTTP status — to'g'ri tanlash
Status
Qachon
Misol
200 OK
Muvaffaqiyatli GET/PATCH
Order ma'lumoti qaytadi
201 Created
POST yangi resource yaratdi
Yangi order yaratildi
202 Accepted
Async operation boshlandi
Report generation queue'da
204 No Content
DELETE muvaffaqiyatli
Order o'chirildi
400 Bad Request
Mijoz xato yubordi
JSON parse fail
401 Unauthorized
Auth yo'q yoki noto'g'ri
Token expired
403 Forbidden
Auth bor, lekin ruxsat yo'q
Boshqaning order'i
404 Not Found
Resource mavjud emas
Order topilmadi
409 Conflict
State conflict
Duplicate email signup
422 Unprocessable
Validation failed
Pydantic error
429 Too Many Requests
Rate limit
API limit exceeded
500 Server Error
Bizning bug
Unhandled exception
502 Bad Gateway
Upstream fail
External API down
503 Unavailable
Vaqtinchalik tushgan
Deployment ongoing
3.2
graphql vs grpc vs trpc — qachon qaysi?
REST'dan tashqari qanday protokollar bor va qachon ishlatiladi?
javob
GraphQL — flexible query language
Oddiy tushuntirish
REST: server qaror qiladi "nima qaytarish kerak". GraphQL: client qaror qiladi "menga aynan bular kerak". Client underfetching/overfetching muammosini hal qiladi.
graphql query
query GetUserDashboard($userId: ID!) {
user(id: $userId) {
name
email
orders(limit: 5, status: ACTIVE) {
id
total
items {
product { name, price }
quantity
}
}
notifications(unread: true) {
id
message
}
}
}
Bitta request → butun dashboard data. REST'da 4-5 ta request kerak bo'lardi.
GraphQL +
Client aynan kerakli data oladi
Versioning kerak emas (schema evolution)
Strong typing (schema)
Tool'lar zo'r (Apollo, Relay)
GraphQL −
N+1 muammosi (DataLoader kerak)
Caching qiyin
File upload noqulay
Learning curve
Query complexity security
gRPC — high-performance RPC
Google yaratgan. HTTP/2 + Protocol Buffers. 2-10x tezroq REST'dan. Microservice-to-microservice uchun ideal.
// Client — backend type'larini to'liq biladi
const { data, isLoading } = trpc.getOrder.useQuery({ id: "abc-123" });
// ^^^^ type: Order | undefined, fully typed
const createOrder = trpc.createOrder.useMutation();
createOrder.mutate({ userId: "u-1", items: [{ productId: "p-1", qty: 2 }] });
// If you pass wrong type — TypeScript error at compile time
Tanlash matritsasi
Protocol
Qachon tanlash
Performance
DX
REST
Public API, klassik CRUD
Yaxshi
Yaxshi
GraphQL
Mobile + web, complex UI
O'rtacha
Zo'r
gRPC
Microservice-to-microservice
Eng yaxshi
O'rtacha
tRPC
TS mono-repo (Next.js)
Yaxshi
Eng yaxshi
WebSocket
Real-time bidirectional
Yaxshi
O'rtacha
SSE
Server → client stream
Yaxshi
Yaxshi
✦
Hybrid approach — tavsiya
Praktikada: REST public API uchun, gRPC internal service'lar o'rtasida, WebSocket real-time uchun, GraphQL mobile clients uchun (agar ish hajmi kattasini haqlasa). Bitta arxitektura bir nechta protokolni birga ishlatishi normal.
3.3
http request-to-response — to'liq hayot sikli
Browser "Enter" bosgandan to javob kelguncha nima bo'ladi? Har bir qadam qanday ishlaydi?
javob
Nima uchun buni bilish kerak?
Ko'pchilik developer "request yuboraman, response keladi" deb o'ylaydi. Lekin bu orada 15+ ta qatlam va qaror qabul qilish nuqtasi bor. Performance muammosi qayerda? Xavfsizlik tekshiruvi qayerda? Caching qayerda yutadi? Load balancer qanday qaror qiladi? Bularni bilmay tizim optimize qilish — ko'r bo'lib ot minish.
Har qatlamda nima bo'ladi — chuqur tahlil
1. DNS — Telefon kitobi
api.myapp.com → brauzer avval local cache'dan qidiradi (msec), keyin OS DNS cache, keyin ISP DNS server (50-100ms), nihoyat root DNS (nadir). DNS TTL muhim: 300 sekund = har 5 minutda lookup. CDN ishlatilsa, DNS Anycast texnikasi bilan geografik eng yaqin serverga yo'naltiriladi. DNS propagation — domain o'zgartirganda 24-48 soat olishi mumkin, shuning uchun deploy oldidan TTL'ni pasaytirish kerak.
2. TLS Handshake — Maxfiy kalit kelishuvi
Birinchi ulanishda: (1) Client "Hello" — qanday encryption ishlata olishini aytadi, (2) Server sertifikat yuboradi, (3) Client sertifikatni CA (Certificate Authority) orqali tekshiradi, (4) Session key kelishiladi. Bu ~1-2 round trip = 60-120ms. Keyingi ulanishlarda TLS Session Resumption bu xarajatni 0'ga tushiradi. HTTP/2 multiplexing bilan bitta TLS session'da parallel requestlar.
3. Load Balancer — Trafikni taqsimlash
LB (Nginx, HAProxy, AWS ALB) so'rovni qaysi server olishi kerakligini hal qiladi. Algoritmlar: Round-robin (navbat bilan), Least connections (kam band serverga), IP hash (bir client har doim bir serverga — session affinity), Weighted (kuchli serverga ko'proq). LB sog'liq tekshiruvlari: har 5 sekundda GET /health — javob bermagan server ishdan chiqariladi.
4. Middleware — "Darvozon qo'riqchilari"
Request app handler'ga yetishidan oldin bir qator "filtr"lardan o'tadi. Har filtr o'z qarorini qiladi — davom ettirish yoki to'xtatish. FastAPI'da: Auth middleware (JWT tekshirish), CORS (cross-origin headers), Request ID (tracing uchun UUID), Rate limit, Body size limit, Timeout. Bu chain'da kech topilgan xato — masalan, auth'dan o'tib, rate limit ham o'tib, keyin DB'da yozuv topilmasa — ko'proq resurs sarflangan. Shuning uchun tekshiruvlar tartib muhim.
POST — yangi resurs yaratildi, Location header qo'sh
POST'ni ham 200 qaytarish
204
No Content
DELETE muvaffaqiyatli, body yo'q
–
400
Bad Request
Validation xato (schema, format)
Server xatosini 400 deyish
401
Unauthorized
Token yo'q yoki invalid
403 o'rniga 401 ishlatish
403
Forbidden
Authenticated, lekin ruxsat yo'q
Resursni "topilmadi" deb yashirish (401 o'rniga)
404
Not Found
Resurs mavjud emas
Har xatoni 404 deyish
409
Conflict
Duplicate entry, concurrent modification
–
422
Unprocessable
FastAPI default — Pydantic validation xato
–
429
Too Many Requests
Rate limit, Retry-After header qo'sh
–
500
Internal Error
Server xatosi — stack trace foydalanuvchiga CHIQMASin
Stack trace expose qilish
502/503
Bad/Unavail
Upstream service down yoki overload
–
RFC 9457 Problem Details — professional error format
Nima uchun standart error format?
Har developer o'z formatida error qaytarsa — frontend har endpoint uchun alohida error handling yozishi kerak. RFC 9457 (avval RFC 7807) — HTTP API'larda error uchun standart JSON format. type machine-readable URI, title human-readable, detail — aniq ma'lumot. FastAPI bu formatni qo'llab-quvvatlaydi.
error_handling.py
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse
from pydantic import BaseModel
# RFC 9457 — Problem Details format
class ProblemDetail(BaseModel):
type: str # URI — mashinaga o'qiladi
title: str # Qisqa xato nomi
status: int # HTTP status code
detail: str # Aniq, foydali ma'lumot
instance: str = "" # Qaysi URL'da bo'ldi
# Custom exception
class AppError(Exception):
def __init__(self, type: str, title: str, detail: str, status: int = 400):
self.type = type
self.title = title
self.detail = detail
self.status = status
app = FastAPI()
@app.exception_handler(AppError)
async def app_error_handler(request: Request, exc: AppError):
return JSONResponse(
status_code=exc.status,
content={
"type": f"https://docs.myapp.com/errors/{exc.type}",
"title": exc.title,
"status": exc.status,
"detail": exc.detail,
"instance": str(request.url),
},
headers={"Content-Type": "application/problem+json"}
)
# Ishlatish:
@app.post("/orders")
async def create_order(data: OrderCreate):
if data.amount <= 0:
raise AppError(
type="invalid-amount",
title="Invalid Order Amount",
detail=f"Amount must be positive, got {data.amount}",
status=400
)
if not await check_inventory(data.product_id):
raise AppError(
type="out-of-stock",
title="Product Out of Stock",
detail=f"Product {data.product_id} is currently unavailable",
status=409
)
...
# Response:
# HTTP 409
# Content-Type: application/problem+json
# {
# "type": "https://docs.myapp.com/errors/out-of-stock",
# "title": "Product Out of Stock",
# "status": 409,
# "detail": "Product abc-123 is currently unavailable",
# "instance": "/orders"
# }
3.4
websocket, sse, long polling — real-time protokollar
Real-time ma'lumot uzatish uchun qaysi protokol qachon ishlatiladi?
javob
HTTP polling muammosi — nima uchun yangi protokollar kerak?
Klassik HTTP: client so'rovni yuboradi, server javob qaytaradi — connection yopiladi. Lekin chat, live notification, stock price kabi hollarda server client'ga birinchi bo'lib xabar yuborishi kerak. HTTP polling yechimi: "har 1 sekundda serverga so'rov" — 99% so'rov "yangilik yo'q" javobini oladi, lekin hammasi ham server resursini isrof qiladi.
Texnika
Qanday ishlaydi
Latency
Server load
Qachon
Short Polling
Har N sekundda GET
N sekund
Juda yuqori
Ishlatmang
Long Polling
Server xabar bo'lguncha kutadi (30-60s)
~0ms
O'rta (connection hold)
Fallback uchun
SSE
HTTP keep-alive, server push only
<50ms
Past
Bir tomonlama: log, AI stream
WebSocket
Bi-directional persistent TCP
<10ms
O'rta
Chat, collaboration, gaming
WebRTC
P2P, UDP-based
<5ms
Juda past (P2P)
Video/voice call
SSE — Server-Sent Events: LLM streaming uchun ideal
SSE nima uchun LLM stream uchun zo'r?
ChatGPT matni "harfma-harf" ko'rsatiladi — bu SSE. LLM token'larni birma-bir generate qiladi. SSE bilan har token kelishi bilan browser'ga push qilinadi. WebSocket kerak emas — chunki foydalanuvchi faqat oladi, yuborishning keragi yo'q (faqat boshlang'ich prompt HTTP orqali keladi). SSE'ning afzalligi: oddiy HTTP, automatic reconnect, browser native.
sse_stream.py
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from anthropic import AsyncAnthropic
import asyncio, json
app = FastAPI()
client = AsyncAnthropic()
@app.post("/api/chat/stream")
async def chat_stream(prompt: str, user_id: str):
"""LLM javobini SSE orqali streaming"""
async def generate():
# SSE format: "data: {json}\n\n"
# Connection o'rnatildi signali
yield f"data: {json.dumps({'type': 'start'})}\n\n"
full_text = ""
try:
# Claude streaming API
async with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}]
) as stream:
async for text_chunk in stream.text_stream:
full_text += text_chunk
# Har token'ni darhol yuborish
yield f"data: {json.dumps({'type': 'chunk', 'text': text_chunk})}\n\n"
# Tugash signali
yield f"data: {json.dumps({'type': 'done', 'total_chars': len(full_text)})}\n\n"
except Exception as e:
yield f"data: {json.dumps({'type': 'error', 'message': str(e)})}\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no", # Nginx buffering'ni o'chirish
"Connection": "keep-alive",
}
)
# Frontend (JavaScript) tomonida:
# const es = new EventSource('/api/chat/stream?prompt=...');
# es.onmessage = (e) => {
# const data = JSON.parse(e.data);
# if (data.type === 'chunk') appendText(data.text);
# if (data.type === 'done') es.close();
# };
04
caching & performance
4.1
caching layers — browser, cdn, app, db
Cache qayerda ishlatiladi va har bir qatlam nima qiladi?
javob
Cache nima va nima uchun kerak?
Cache — bu "oldindan javob tayyorlab qo'yish". Tasavvur qiling: qo'shni har kuni sizdan "soat nechada?" deb so'raydi. Har safar telefonga qarab vaqtni ko'rsatish o'rniga, bitta katta soat devorga osilib qo'ysa — minglagan so'rovga bitta javob. Kompyuterda ham xuddi shunday: DB'ga yoki API'ga qayta-qayta borish o'rniga, natijani vaqtinchalik joyda saqlab, keyingi so'rovda shu joydan olinadi.
Cache kuch-quvvati: faqat bitta qatlam emas, 5 ta qatlam bor — har biri turli muammoní hal qiladi. Browser cache Internet traffic'ni tejaydi. CDN geografik masofani qisqartiradi. App memory sub-millisecond javob beradi. Redis distributed state'ni saqlaydi. DB query cache og'ir hisoblashlarni qayta ishlamaydi.
5 qatlamli cache arxitekturasi
Cache invalidation — eng qiyin masala
ⓘ
Phil Karlton's law
"There are only two hard things in Computer Science: cache invalidation and naming things." — 1996 yilda aytilgan, hali ham haqiqat.
Pattern 1 — Write-through (predictable)
write_through.py
async def update_user(user_id: str, data: dict):
# 1. DB yangilanadi
user = await db.update(user_id, data)
# 2. Cache yangilanadi bir vaqtda
await redis.setex(f"user:{user_id}", 3600, json.dumps(user.dict()))
return user
Pattern 2 — Write-behind (fast, risky)
write_behind.py
async def increment_view_count(post_id: str):
# Cache'ga tez yozamiz
await redis.incr(f"views:{post_id}")
# DB'ga keyingi batch'da yoziladi (har 30 sekundda)
# Background job
async def flush_view_counts():
while True:
keys = await redis.keys("views:*")
if keys:
async with db.transaction():
for key in keys:
post_id = key.split(":")[1]
count = int(await redis.get(key))
await db.execute(
"UPDATE posts SET view_count = view_count + $1 WHERE id = $2",
count, post_id
)
await redis.delete(key)
await asyncio.sleep(30)
Pattern 3 — Cache-aside with versioning (flexible)
versioned_cache.py
async def get_product(product_id: str) -> dict:
# Global version prefix — mass invalidate uchun
version = await redis.get("cache:products:version") or "1"
key = f"product:v{version}:{product_id}"
cached = await redis.get(key)
if cached:
return json.loads(cached)
product = await db.fetch_product(product_id)
await redis.setex(key, 3600, json.dumps(product.dict()))
return product.dict()
async def invalidate_all_products():
"""Bitta product schema o'zgardi → hammasini bekor qilish"""
await redis.incr("cache:products:version")
# Eski key'lar TTL bilan tabiiy o'ladi (disk cleanup)
TTL (Time To Live) — to'g'ri tanlash
Data tipi
TTL
Sabab
Stock price
1-5 sekund
Real-time kerak
User profile
5-15 daqiqa
Kam o'zgaradi
Product catalog
1 soat
Admin vaqti-vaqtida o'zgartiradi
Country list
1 kun
Deyarli o'zgarmaydi
Feature flags
30 sekund
Toggle darhol ishlasin
LLM responses
1 soat
Token cost tejash
Cache stampede — klassik muammo
Popular key TTL tugadi → 1000 parallel request DB'ga uriladi → DB crash.
stampede_protection.py
async def get_with_stampede_protection(key: str, fetch_fn, ttl: int = 300):
cached = await redis.get(key)
if cached:
return json.loads(cached)
# Distributed lock — faqat bitta request DB'ga boradi
lock_key = f"lock:{key}"
acquired = await redis.set(lock_key, "1", nx=True, ex=10)
if not acquired:
# Boshqa request DB'dan olmoqda — kutamiz
for _ in range(20):
await asyncio.sleep(0.1)
cached = await redis.get(key)
if cached:
return json.loads(cached)
# 2 sek kutdik, hali yo'q — DB'ga o'zimiz boramiz
try:
value = await fetch_fn()
await redis.setex(key, ttl, json.dumps(value))
return value
finally:
await redis.delete(lock_key)
Part II
data & persistence layer
Database arxitekturasi, async sistemalar, event-driven patterns. Data — backend'ning yuragi. Bu bo'limda klassik PostgreSQL'dan vector database'lar va event sourcing'gacha.
Har index turi qachon kerak? Qanday to'g'ri tanlash va qanday ishlatish?
javob
Index — kitob mundarijasi analogiyasi
Index — kitobning mundarijasi. Mundarijasiz "PostgreSQL" so'zini topish uchun 800 sahifani boshdan oxir ko'rib chiqish kerak (Sequential Scan — O(n)). Mundarija bilan darhol "281-bet" topiladi (Index Scan — O(log n)). Lekin mundarija ham bepul emas: joy oladi, har yangi sahifa qo'shilganda yoki o'zgartirilganda yangilanishi kerak (INSERT/UPDATE sekinlashadi).
Amaliy qoida: Index — read'ni tezlashtiradi, write'ni sekinlashtiradi. Ko'p o'qiladigan, kam yoziladigan ustunlarga index. Ko'p yoziladigan, kam o'qiladigan (audit log) — indexsiz. Keraksiz index — katta xato: write performance'ni yo'qotasiz, disk band o'tadi, query planner'ni chalkashtirasiz.
Index turlari — to'liq katalog
Type
Qachon
Misol
B-tree
Default. 95% case
Equality, range, sort
Hash
Faqat equality
Session ID lookup
GIN
JSONB, array, full-text
WHERE tags @> ARRAY['python']
GiST
Geospatial, ranges
PostGIS, tstzrange
BRIN
Very large sorted data
Time-series logs, 5B+ rows
SP-GiST
Non-balanced trees
IP ranges, geometric
Bloom
Multi-column equality
Wide table point lookups
HNSW (pgvector)
Vector similarity
AI embeddings (keyingi bo'lim!)
B-tree — composite va partial
btree_advanced.sql
-- Composite — LEFT-MOST RULE muhim
CREATE INDEX idx_orders_tenant_user_date
ON orders (tenant_id, user_id, created_at DESC);
-- Ishlaydi:
SELECT * FROM orders WHERE tenant_id = ? AND user_id = ? ORDER BY created_at DESC;
SELECT * FROM orders WHERE tenant_id = ? AND user_id = ?;
SELECT * FROM orders WHERE tenant_id = ?;
-- Ishlamaydi (index skip):
SELECT * FROM orders WHERE user_id = ?; -- tenant_id yo'q
SELECT * FROM orders WHERE created_at > ?; -- chap 2 yo'q
-- Partial index — faqat kerakli subsetga
CREATE INDEX idx_orders_active
ON orders (tenant_id, created_at DESC)
WHERE status IN ('pending', 'processing');
-- 90% order'lar "delivered" bo'lsa, index 10x kichikroq
-- Covering index (INCLUDE) — Index Only Scan
CREATE INDEX idx_users_email_covering
ON users (email)
INCLUDE (name, avatar_url);
-- SELECT name, avatar_url FROM users WHERE email = '...'
-- → index'dan to'g'ridan to'g'ri, heap'ga bormaydi
-- Expression index
CREATE INDEX idx_users_lower_email ON users (lower(email));
-- SELECT * FROM users WHERE lower(email) = '...' uchun
-- UNIQUE partial — soft delete uchun
CREATE UNIQUE INDEX idx_users_email_active
ON users (email)
WHERE deleted_at IS NULL;
-- Soft-deleted user'ning email'ini qayta ishlatish mumkin
GIN — JSONB va array uchun kuch
gin_indexes.sql
-- JSONB — flexible schema
CREATE TABLE events (
id UUID PRIMARY KEY,
tenant_id UUID NOT NULL,
event_type TEXT,
properties JSONB
);
-- GIN — har JSONB path'ga index
CREATE INDEX idx_events_properties ON events USING GIN (properties);
-- Tez query'lar:
SELECT * FROM events WHERE properties @> '{"user_type": "premium"}';
SELECT * FROM events WHERE properties ? 'campaign_id';
SELECT * FROM events WHERE properties -> 'metadata' ->> 'source' = 'google';
-- JSONB path ops class (tez va kichik, lekin kam ops)
CREATE INDEX idx_events_props_path ON events USING GIN (properties jsonb_path_ops);
-- Array
CREATE TABLE articles (id SERIAL, title TEXT, tags TEXT[]);
CREATE INDEX idx_articles_tags ON articles USING GIN (tags);
SELECT * FROM articles WHERE tags @> ARRAY['python', 'web']; -- hamma
SELECT * FROM articles WHERE tags && ARRAY['python', 'rust']; -- kamida 1
-- Full-text search
ALTER TABLE articles ADD COLUMN search_vector tsvector
GENERATED ALWAYS AS (
setweight(to_tsvector('english', coalesce(title, '')), 'A') ||
setweight(to_tsvector('english', coalesce(body, '')), 'B')
) STORED;
CREATE INDEX idx_articles_search ON articles USING GIN (search_vector);
SELECT *, ts_rank(search_vector, query) AS rank
FROM articles, plainto_tsquery('english', 'backend architecture') query
WHERE search_vector @@ query
ORDER BY rank DESC;
BRIN — massive time-series
5 milliard qator, sorted by time? B-tree 200 GB. BRIN 600 MB. Faqat "block range" saqlaydi (har 128 blok uchun min/max). Time-range query'lar uchun yetarli.
brin.sql
CREATE TABLE metrics (
id BIGSERIAL,
tenant_id UUID,
metric_name TEXT,
value DOUBLE PRECISION,
ts TIMESTAMPTZ NOT NULL
);
-- Time asosida BRIN (time-series natural ordering)
CREATE INDEX idx_metrics_ts_brin
ON metrics USING BRIN (ts)
WITH (pages_per_range = 128);
-- tenant_id composite BRIN
CREATE INDEX idx_metrics_tenant_ts_brin
ON metrics USING BRIN (tenant_id, ts);
Index qachon ZARARLI?
✕
Over-indexing xavf
Har index:
· Disk joy oladi (~10-30% table size)
· Write sekinlashtiradi (har INSERT/UPDATE har indexni yangilaydi)
· Memory/cache ishlatadi
· VACUUM vaqtini uzaytiradi
10+ index bo'lgan jadval — red flag. Auditing kerak.
Unused index'larni topish
unused.sql
-- Ishlatilmayotgan index'lar
SELECT
schemaname, tablename, indexname,
pg_size_pretty(pg_relation_size(indexrelid)) AS size,
idx_scan
FROM pg_stat_user_indexes
WHERE idx_scan < 50 -- juda kam ishlatilgan
AND NOT indisunique -- unique emas (unique'ni tashlab bo'lmaydi)
ORDER BY pg_relation_size(indexrelid) DESC
LIMIT 20;
-- Dublikat index'lar
SELECT indrelid::regclass AS table,
array_agg(indexrelid::regclass) AS indexes
FROM pg_index
GROUP BY indrelid, indkey
HAVING count(*) > 1;
5.2
EXPLAIN ANALYZE — query tuning mastery
Sekin query'ni qanday aniqlash va optimallashtirish?
javob
EXPLAIN output'ini o'qish
explain_example.sql
EXPLAIN (ANALYZE, BUFFERS, VERBOSE, FORMAT TEXT)
SELECT o.*, u.name
FROM orders o
JOIN users u ON u.id = o.user_id
WHERE o.tenant_id = 'xxx'
AND o.status = 'pending'
ORDER BY o.created_at DESC
LIMIT 20;
/* Output (labels explained):
Limit (cost=0.85..12.34 rows=20 width=128)
^ ^ ^
estimate start total rows
(actual time=0.123..2.456 rows=20 loops=1)
^ ^
actual start actual end
-> Nested Loop (cost=0.85..5234.12 rows=1234 width=128)
-> Index Scan using idx_orders_tenant_status on orders o
(cost=0.43..234.56 rows=1234 width=64)
(actual time=0.012..0.456 rows=1234 loops=1)
Index Cond: (tenant_id = 'xxx' AND status = 'pending')
Buffers: shared hit=45 read=3
^^^ ^^^^
cache disk
-> Index Scan using users_pkey on users u
(cost=0.29..4.05 rows=1 width=64)
(actual time=0.001..0.001 rows=1 loops=1234)
^^^^^^^^^
1234 ta iteration!
Index Cond: (id = o.user_id)
Planning Time: 0.234 ms
Execution Time: 2.678 ms
*/
Node turlari — nima yomon, nima yaxshi
Node type
Nima
Yaxshi / Yomon
Index Only Scan
Index'dan to'g'ridan to'g'ri
★ Eng tez
Index Scan
Index → heap
✓ Yaxshi
Bitmap Heap Scan
Ko'p qator — bitmap orqali
◐ O'rtacha
Seq Scan
To'liq jadvalni o'qish
✕ Katta jadvalda yomon
Nested Loop
Har qator uchun inner loop
◐ Kichik N'da yaxshi
Hash Join
Hash table quradi
✓ Katta N'da yaxshi
Merge Join
Ikkala tomon sortlangan bo'lsa
✓ Sort'da tez
Sort
ORDER BY uchun
✕ Agar memory'dan oshsa (external)
Real optimization — 12 sek → 80 ms
slow_query.sql
-- BOSHLANG'ICH (12 sekund!)
SELECT
u.name, COUNT(o.id) AS orders, SUM(o.amount) AS revenue,
(SELECT MAX(created_at) FROM orders WHERE user_id = u.id) AS last_order
FROM users u
LEFT JOIN orders o ON o.user_id = u.id
WHERE u.tenant_id = 'xxx'
AND u.created_at > NOW() - INTERVAL '1 year'
GROUP BY u.id, u.name
ORDER BY revenue DESC NULLS LAST
LIMIT 100;
-- EXPLAIN ANALYZE ko'rsatdi:
-- 1. Seq Scan on users (tenant_id + created_at index yo'q)
-- 2. Correlated subquery MAX(created_at) - har user uchun alohida query!
-- 3. Nested Loop 500k iteration
-- YECHIM:
-- Index
CREATE INDEX idx_users_tenant_created ON users (tenant_id, created_at DESC);
CREATE INDEX idx_orders_user_created ON orders (user_id, created_at DESC);
-- Query rewrite — correlated subquery'ni LATERAL bilan
SELECT
u.name,
COALESCE(stats.orders, 0) AS orders,
COALESCE(stats.revenue, 0) AS revenue,
stats.last_order
FROM users u
LEFT JOIN LATERAL (
SELECT
COUNT(*) AS orders,
SUM(amount) AS revenue,
MAX(created_at) AS last_order
FROM orders o
WHERE o.user_id = u.id
) stats ON true
WHERE u.tenant_id = 'xxx'
AND u.created_at > NOW() - INTERVAL '1 year'
ORDER BY stats.revenue DESC NULLS LAST
LIMIT 100;
-- Natija: 12s → 80ms. 150x tezroq.
pg_stat_statements — asosiy debugging tool
pg_stat.sql
-- Extension
CREATE EXTENSION IF NOT EXISTS pg_stat_statements;
-- Eng sekin query'lar
SELECT
substring(query, 1, 80) AS query,
calls,
round(total_exec_time::numeric, 2) AS total_ms,
round(mean_exec_time::numeric, 2) AS avg_ms,
round(stddev_exec_time::numeric, 2) AS stddev_ms
FROM pg_stat_statements
ORDER BY total_exec_time DESC
LIMIT 20;
-- Eng ko'p I/O qiladiganlar
SELECT
substring(query, 1, 80) AS query,
calls,
shared_blks_read, -- disk read
shared_blks_hit -- cache hit
FROM pg_stat_statements
ORDER BY shared_blks_read DESC
LIMIT 20;
5.3
connection pooling — pgbouncer deep dive
Connection pool qanday ishlaydi va qaysi mode qachon ishlatiladi?
javob
Muammo
Har PostgreSQL connection ~10 MB RAM oladi + process fork. 1000 connection = 10 GB RAM faqat connection'larga. PostgreSQL'da max_connections odatda 100-200. Lekin sizning application 50 ta container × 10 connection = 500 connection kerak.
PgBouncer pool modes
Mode
Ishlash
Multiplexing
Cheklov
session
Client connect → disconnect
Yo'q (1:1)
Ko'p kerak emas
transaction
Har transaction uchun backend
Zo'r (50:1 OK)
No prepared stmt*, no LISTEN/NOTIFY
statement
Har statement uchun
Eng yaxshi
No transactions (SELECT faqat)
* psycopg 3.1+ va asyncpg 0.28+ — transaction pool bilan prepared statement'ni qo'llab-quvvatlaydi (protocol-level disable).
from sqlalchemy.ext.asyncio import create_async_engine
# PgBouncer ORQALI connect qilamiz (6432 port)
engine = create_async_engine(
"postgresql+asyncpg://user:pass@pgbouncer.internal:6432/myapp",
pool_size=10,
max_overflow=5,
pool_pre_ping=True, # connection tekshirish (health check)
pool_recycle=3600, # har soatda qayta yaratish
connect_args={
# MUHIM: transaction pool mode uchun
"statement_cache_size": 0,
"prepared_statement_cache_size": 0,
}
)
Monitoring — PgBouncer stats
monitoring.sql
-- PgBouncer admin'ga connect
-- psql -h pgbouncer -p 6432 pgbouncer
SHOW POOLS;
-- cl_active | cl_waiting | sv_active | sv_idle
-- 45 | 0 | 18 | 2
SHOW STATS;
-- total_xact_count, total_query_count, avg_xact_time
SHOW CLIENTS;
-- clients va ularning state'i
✓
Real case — Beeline Uzbekistan
50 ta FastAPI pod, har biri 10 connection = 500 client connection. PostgreSQL max_connections = 200 (monitoring, backup, admin uchun ham kerak). PgBouncer transaction mode bilan: 500 client → 20 backend connection. Latency p99 kamaydi, PostgreSQL CPU 30%ga tushdi.
5.4
sharding, replication, read replicas
Vertical scale tugadi — horizontal qanday qilinadi?
javob
Scaling journey — order matters
Stage 1: Optimize → index, query tuning, caching (80% muammo hal)
Stage 2: Vertical → katta server (more CPU, RAM, NVMe) (10% hal)
Stage 3: Read replica → read trafikni distribute (5% hal)
Stage 4: Sharding → write'ni ham distribute (4% hal)
Stage 5: Rethink DB → different data model (NoSQL, vector) (1% hal)
85% muammolar Stage 1 da hal bo'ladi. Stage 4 ga yetganingiz — 1M+ user.
Replication topologies
Primary-Replica (eng oddiy)
replication_setup.sh
# Primary postgresql.conf
wal_level = replica
max_wal_senders = 10
wal_keep_size = 2GB
synchronous_commit = on # data loss tolerance
# Replica uchun user
CREATE ROLE replicator WITH REPLICATION LOGIN PASSWORD '...';
# Replica setup
pg_basebackup -h primary -D /var/lib/postgresql/data \
-U replicator -P -W -R
# Replica automatic:
# standby.signal file yaratiladi
# primary_conninfo postgresql.auto.conf'ga yoziladi
Async vs Sync replication
Async (default) +
Primary write tez
Replica'ga bog'liq emas
Network issue primary'ni bloklash yo'q
Async −
Replication lag (100ms-5s)
Primary crash — oxirgi commit'lar yo'qolishi
Read-after-write consistency yo'q
read_after_write.py
"""Read-after-write pattern: just-updated data uchun primary'dan o'qish"""
class SmartDBRouter:
async def update_and_track(self, user_id: str, data: dict):
user = await self.primary.update_user(user_id, data)
# Marker — keyingi 5 sek primary'dan o'qish
await redis.setex(f"primary_only:{user_id}", 5, "1")
return user
async def read_user(self, user_id: str):
# Check — just updated?
if await redis.get(f"primary_only:{user_id}"):
return await self.primary.get_user(user_id)
# Lag check
lag = await self.get_replica_lag()
if lag > 10: # 10 sek dan ko'p
return await self.primary.get_user(user_id)
return await self.replica.get_user(user_id)
async def get_replica_lag(self) -> float:
result = await self.replica.fetch_val(
"SELECT EXTRACT(EPOCH FROM (NOW() - pg_last_xact_replay_timestamp()))"
)
return float(result or 0)
Sharding strategies
Strategy
Qachon
Pro/Con
Hash-based
Teng distribution kerak
+ Balance, − Range query qiyin
Range-based
Time-series, natural ranges
+ Range fast, − Hot spot
Tenant-based
SaaS (one shard per tenant)
+ Isolation, − Re-shard qiyin
Geographic
Multi-region compliance
+ GDPR, − Cross-region join
Consistent hashing
Dinamik shard qo'shish
+ Re-shard oson, − Murakkab
Citus — PostgreSQL sharding layer
citus.sql
-- Citus extension (native PostgreSQL sharding)
CREATE EXTENSION citus;
-- Shardlash uchun jadval
CREATE TABLE events (
id BIGSERIAL,
tenant_id UUID NOT NULL,
event_type TEXT,
payload JSONB,
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- Shard key — tenant_id
SELECT create_distributed_table('events', 'tenant_id');
-- Citus avtomatik 32 shard yaratadi (default)
-- Query'lar avtomatik to'g'ri shard'ga boradi
-- Cross-shard aggregate: parallel scatter-gather
-- Reference table (kichik, har shard'da copy)
CREATE TABLE tenants (id UUID PRIMARY KEY, name TEXT);
SELECT create_reference_table('tenants');
✦
Modern alternatives — new-gen databases
Klassik PostgreSQL sharding o'rniga tayyor scalable DB'lar: CockroachDB (distributed SQL), YugabyteDB (PostgreSQL-wire compatible), Neon (serverless PG with branching), PlanetScale (MySQL + Vitess). Operatsion xarajat kamroq, lekin cost yuqoriroq.
5.5
vector databases — ai infrastructure yurakligi
pgvector, Pinecone, Qdrant, Weaviate — qaysi birini tanlash? Amaliy realitet.
javob
Nima uchun vector DB?
AI model matn yoki rasmni embedding'ga aylantiradi — 384 yoki 1536 o'lchovli raqamlar vektori. "Bu matnga o'xshash matnlar qaysilari?" degan savolga javob — vector similarity. B-tree index'lar bu vazifaga yaroqsiz — HNSW yoki IVF kerak.
HNSW algoritmi — qanday ishlaydi?
HNSW = Hierarchical Navigable Small World
─────────────────────────────────────────
Layer 3 (eng tepa, kam nodes): O ──── O ──── O
│ │ │
Layer 2: O───O──O──O───O
│ │ │ │ │
Layer 1: O──O───O──O──O───O──O
│ │ │ │ │ │ │
Layer 0 (hamma nodes): O──O──O───O──O──O───O──O──O
Search: Top layer'dan boshlaydi, tez approximate,
quyiga tushib aniqlashtiradi.
O(log n) complexity, 95-99% recall.
pgvector — eng oddiy boshlang'ich
Sizning stack'ingizda PostgreSQL bor bo'lsa, alohida vector DB kerak emas. pgvector 0.8 (2025-dan) HNSW qo'llab-quvvatlaydi. 50M vektor'gacha production uchun yetarli.
pgvector.sql
-- Extension
CREATE EXTENSION vector;
-- Embedding jadval
CREATE TABLE documents (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL,
content TEXT NOT NULL,
embedding vector(1536), -- OpenAI ada-002 dim
metadata JSONB,
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- HNSW index (tez ANN search)
CREATE INDEX idx_documents_embedding
ON documents
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);
-- ivfflat alternative (kichikroq RAM, lekin sekinroq)
-- CREATE INDEX ... USING ivfflat (embedding vector_cosine_ops) WITH (lists = 100);
-- Query — eng yaqin 10 ta document
SELECT
id, content, metadata,
1 - (embedding <=> $1::vector) AS similarity
FROM documents
WHERE tenant_id = $2
ORDER BY embedding <=> $1::vector -- cosine distance
LIMIT 10;
-- Operators:
-- <-> Euclidean distance
-- <=> Cosine distance
-- <#> Inner product (negative)
Qdrant — production default
Rust'da yozilgan, eng tez filtered search (p50 4ms @ 1M vectors). Open-source + managed cloud.
Celery qanday ishlaydi? Production'da qanday qilinsa yaxshi?
javob
Nima uchun task queue? — Synchronous muammosi
Tasavvur qiling: restoran kasiriga mijoz keladi. Kassir "hozir oshpazga borib ovqatingizni pishirib kelaman" desa — navbatdagi 50 ta mijoz kutib turadi. Task queue — bu kassirga "chek berib, keyingisiga o'ting" texnikasi: mijozga darhol "qabul qilindi" javobi, oshpaz esa orqada bajaradi.
Celery holatlari: email yuborish (3 sek), PDF generatsiya (10 sek), ML inference (30 sek), report (5 min), nightly batch (soatlar). Bularning hammasini HTTP request ichida bajarish mumkin emas — user kutmaydi, connection timeout bo'ladi, server thread bloklanadi.
Celery arxitekturasi
Production-ready task
tasks.py
from celery import Celery, Task
from celery.exceptions import MaxRetriesExceededError
import structlog
logger = structlog.get_logger()
app = Celery(
'myapp',
broker='redis://redis:6379/0',
backend='redis://redis:6379/1'
)
app.conf.update(
task_acks_late=True, # faqat success'dan keyin ack
task_reject_on_worker_lost=True, # worker crash bo'lsa — qayta
task_track_started=True,
worker_prefetch_multiplier=1, # fair distribution
task_serializer='json',
result_serializer='json',
timezone='UTC',
enable_utc=True,
task_soft_time_limit=300, # 5 daqiqa soft
task_time_limit=360, # 6 daqiqa hard kill
)
app.conf.task_routes = {
'app.tasks.send_email': {'queue': 'fast', 'priority': 5},
'app.tasks.process_payment': {'queue': 'fast', 'priority': 9},
'app.tasks.generate_report': {'queue': 'heavy', 'priority': 3},
'app.tasks.embed_document': {'queue': 'gpu', 'priority': 5},
}
class RetryableTask(Task):
"""Base class — automatic retry with exponential backoff"""
autoretry_for = (ConnectionError, TimeoutError)
retry_backoff = True
retry_backoff_max = 300
retry_jitter = True
max_retries = 5
@app.task(
base=RetryableTask,
bind=True,
rate_limit='100/m',
acks_late=True,
)
def send_email(self, tenant_id: str, to: str, subject: str, body: str):
logger.info("send_email.start", task_id=self.request.id, tenant=tenant_id)
try:
result = email_service.send(to=to, subject=subject, body=body)
logger.info("send_email.success", task_id=self.request.id)
return {"status": "sent", "message_id": result.id}
except SMTPRateLimited as e:
# 1 daqiqadan keyin qayta
raise self.retry(exc=e, countdown=60)
except SMTPRecipientsRefused as e:
# Qayta urinib bo'lmaydi — DLQ ga
logger.error("send_email.invalid_recipient", to=to)
raise # task_failure_handler DLQ'ga yozadi
Celery Canvas — workflow orchestration
workflows.py
from celery import chain, group, chord
# CHAIN — sequential (birin-ketin)
workflow = chain(
fetch_user_data.s(user_id),
enrich_with_analytics.s(),
generate_report.s(),
upload_to_s3.s(),
notify_user.s(user_id)
)
workflow.apply_async()
# GROUP — parallel
header = group(
process_image.s(img_id) for img_id in image_ids
)
result = header.apply_async()
results = result.join() # wait all
# CHORD — parallel + callback
workflow = chord(
[process_image.s(img) for img in images], # parallel
combine_images.s() # callback
)
# MAP-REDUCE style
reduce_workflow = chord(
group(analyze_doc.s(doc) for doc in documents),
aggregate_results.s()
)
Scheduled tasks — Celery Beat
celery_beat.py
from celery.schedules import crontab
app.conf.beat_schedule = {
# Har 15 daqiqada
'cleanup-expired-sessions': {
'task': 'app.tasks.cleanup_sessions',
'schedule': 900.0,
},
# Har kuni 02:00 UTC
'daily-reports': {
'task': 'app.tasks.generate_daily_reports',
'schedule': crontab(hour=2, minute=0),
},
# Dushanba ertalab
'weekly-digest': {
'task': 'app.tasks.send_weekly_digest',
'schedule': crontab(day_of_week=1, hour=9, minute=0),
},
}
6.2
kafka — event streaming platform
Kafka nima? Qachon Celery yetmaydi va Kafka kerak bo'ladi?
javob
Celery vs Kafka — asosiy farq
Celery/RabbitMQ — task queue. Worker task'ni oladi, bajaradi, o'chiradi. Bir task — bir worker. Kafka — immutable event log. Event yozildi → o'chirmaysiz, kunlar/haftalar saqlaydi. Ko'plab mustaqil consumer group'lar bir xil event'ni o'z tezligida o'qiydi.
Misol: foydalanuvchi buyurtma berdi. Celery: "order_confirmation yuborish" vazifasi bitta worker'ga ketadi — bajarildi, tamom. Kafka: "order.placed" event yoziladi → Analytics service (o'z tezligida), Notification service (real-time), ML training pipeline (kechqurun), Accounting service (kunlik batch) — hammasi bir xil event'ni o'qiydi, bir-biriga ta'sir qilmaydi.
Kafka partition — parallel ishlash siri
Kafka topic bir nechta partition'ga bo'linadi. Har partition — tartiblangan log. Har partition bitta consumer (bir consumer group'da) tomonidan o'qiladi. Demak: 6 partition = 6 parallel consumer = 6x throughput. Ko'proq partition — ko'proq parallellik. Lekin partition soni so'ngli muvozanat kerak: juda ko'p partition = ko'p metadata, slow leader election. Production'da 1 partition ≈ 10-100 MB/s throughput — shunga qarab hisoblang.
from aiokafka import AIOKafkaConsumer
async def run_analytics_consumer():
consumer = AIOKafkaConsumer(
'orders.events',
bootstrap_servers='kafka1:9092,kafka2:9092,kafka3:9092',
group_id='analytics-v1', # consumer group
enable_auto_commit=False, # manual commit muhim
auto_offset_reset='earliest',
value_deserializer=lambda v: json.loads(v.decode()),
max_poll_records=100,
)
await consumer.start()
try:
async for msg in consumer:
try:
# Idempotency check — event'ni qayta ishlatmaslik
if await already_processed(msg.value["event_id"]):
await consumer.commit()
continue
# Process
async with db.transaction():
await process_analytics_event(msg.value)
await mark_processed(msg.value["event_id"])
# Manual commit — success'dan keyin
await consumer.commit()
except Exception as e:
logger.error("consumer.error", error=str(e), offset=msg.offset)
# Commit qilmaymiz — qayta keladi
# N marta fail bo'lsa DLQ topic'ga yuboramiz
finally:
await consumer.stop()
Celery vs Kafka — tanlash
Aspect
Celery/RabbitMQ
Kafka
Model
Task queue
Event log
Retention
Bajarilgan = o'chirildi
Kun/hafta/umr saqlanadi
Consumer'lar
1 task — 1 consumer
1 event — N consumer
Throughput
10k msg/sec
1M+ msg/sec
Latency
Past (<10ms)
O'rtacha (10-50ms)
Ops complexity
Past
Yuqori (ZK/KRaft, partitions)
Use case
Email, payments, reports
Event sourcing, analytics, ML pipelines
6.3
event sourcing & cqrs
Event Sourcing va CQRS — nima, qachon, qanday?
javob
Klassik vs Event Sourcing
Klassik CRUD: jadvalda hozirgi state saqlanadi. balance = 100. Event Sourcing: jadvalda hamma o'zgarishlar saqlanadi. [+100, -20, +50, -30]. Hozirgi state — bu event'lar yig'indisi.
Event Sourcing misol — Wallet
wallet_events.py
from dataclasses import dataclass
from datetime import datetime
from decimal import Decimal
from typing import List
@dataclass(frozen=True)
class WalletEvent:
wallet_id: str
event_type: str
occurred_at: datetime
version: int
@dataclass(frozen=True)
class WalletCreated(WalletEvent):
user_id: str
currency: str
@dataclass(frozen=True)
class MoneyDeposited(WalletEvent):
amount: Decimal
source: str
@dataclass(frozen=True)
class MoneyWithdrawn(WalletEvent):
amount: Decimal
destination: str
@dataclass
class Wallet:
id: str
user_id: str
balance: Decimal = Decimal("0")
version: int = 0
@classmethod
def from_events(cls, events: List[WalletEvent]) -> "Wallet":
"""State'ni event'lardan qayta tiklash"""
if not events or not isinstance(events[0], WalletCreated):
raise ValueError("Need WalletCreated first")
first = events[0]
wallet = cls(id=first.wallet_id, user_id=first.user_id, version=1)
for event in events[1:]:
wallet.apply(event)
return wallet
def apply(self, event: WalletEvent):
if isinstance(event, MoneyDeposited):
self.balance += event.amount
elif isinstance(event, MoneyWithdrawn):
if self.balance < event.amount:
raise InsufficientFunds()
self.balance -= event.amount
self.version = event.version
# Event store
class EventStore:
async def append(self, stream_id: str, events: List[WalletEvent], expected_version: int):
"""Optimistic concurrency control"""
async with db.transaction():
current = await db.fetch_val(
"SELECT COALESCE(MAX(version), 0) FROM events WHERE stream_id = $1",
stream_id
)
if current != expected_version:
raise ConcurrencyError(f"Expected {expected_version}, got {current}")
for event in events:
await db.execute("""
INSERT INTO events (stream_id, event_type, payload, version, occurred_at)
VALUES ($1, $2, $3, $4, $5)
""", stream_id, event.event_type, event_to_json(event), event.version, event.occurred_at)
CQRS — Command Query Responsibility Segregation
Write model (commands) va read model (queries) alohida optimallashtiriladi.
▲
Event Sourcing — majburiy emas
Har sistemaga ham kerak emas. Faqat: audit log shart (bank, healthcare), "time travel" — hist state ko'rish kerak, yoki multiple read model'lar (analytics, search). Aks holda — klassik CRUD bilan boshlang.
6.4
saga pattern & transactional outbox
Distributed transaction yo'q — microservice'lar qanday birgalikda ishlaydi?
javob
Saga — distributed transaction alternativi
Order yaratish: Order service, Payment service, Inventory service. Birida fail bo'lsa — oldingilarni rollback. 2PC ishlamaydi (network partition, complexity). Saga — bu compensating transactions zanjiri.
Choreography saga — event-driven
saga_choreography.py
"""Har service event publish qiladi, boshqalari subscribe qiladi"""
# Order service
async def create_order(user_id, items):
order = await db.insert_order(user_id, items, status="pending")
await events.publish("order.created", {
"order_id": order.id, "user_id": user_id,
"items": items, "total": order.total
})
return order
# Payment service (subscribe to order.created)
async def handle_order_created(event):
try:
charge = await stripe.charge(event["user_id"], event["total"])
await events.publish("payment.succeeded", {
"order_id": event["order_id"], "charge_id": charge.id
})
except PaymentError:
await events.publish("payment.failed", {
"order_id": event["order_id"], "reason": str(e)
})
# Inventory service (subscribe to payment.succeeded)
async def handle_payment_succeeded(event):
try:
await reserve_inventory(event["order_id"])
await events.publish("inventory.reserved", {"order_id": event["order_id"]})
except OutOfStock:
await events.publish("inventory.failed", {"order_id": event["order_id"]})
# Compensating action
await events.publish("payment.refund_requested", {"order_id": event["order_id"]})
# Order service (subscribe to all)
async def handle_inventory_reserved(event):
await db.update_order_status(event["order_id"], "confirmed")
async def handle_payment_failed(event):
await db.update_order_status(event["order_id"], "cancelled")
Orchestration saga — central coordinator
saga_orchestration.py
"""Central coordinator step'larni boshqaradi"""
class OrderSaga:
async def execute(self, order_data):
saga_id = uuid4()
state = {"order_id": None, "charge_id": None, "inventory_id": None}
try:
# Step 1: Create order
order = await order_service.create(order_data)
state["order_id"] = order.id
# Step 2: Charge payment
charge = await payment_service.charge(order.total)
state["charge_id"] = charge.id
# Step 3: Reserve inventory
inventory = await inventory_service.reserve(order.items)
state["inventory_id"] = inventory.id
# Step 4: Confirm
await order_service.confirm(order.id)
return order
except Exception as e:
# Rollback — compensating transactions
if state.get("inventory_id"):
await inventory_service.release(state["inventory_id"])
if state.get("charge_id"):
await payment_service.refund(state["charge_id"])
if state.get("order_id"):
await order_service.cancel(state["order_id"])
raise
Transactional Outbox — reliability pattern
Muammo
DB'ga order yozish va Kafka'ga event yuborish — ikki alohida sistema. Birida commit bo'lib, boshqasida fail bo'lsa — inconsistency. 2PC yo'q. Yechim: outbox.
outbox_pattern.sql
-- Outbox jadvali — business DB'da
CREATE TABLE outbox (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_type TEXT NOT NULL, -- 'order', 'user'
aggregate_id UUID NOT NULL,
event_type TEXT NOT NULL,
payload JSONB NOT NULL,
occurred_at TIMESTAMPTZ DEFAULT NOW(),
published_at TIMESTAMPTZ,
retry_count INT DEFAULT 0
);
CREATE INDEX idx_outbox_unpublished
ON outbox (occurred_at)
WHERE published_at IS NULL;
outbox.py
# Business logic — ONE transaction, DB + outbox
async def create_order(data):
async with db.transaction():
order = await db.insert_order(data)
await db.insert_outbox({
"aggregate_type": "order",
"aggregate_id": order.id,
"event_type": "order.created",
"payload": order.to_dict(),
})
return order
# ATOMIC: order va outbox birga commit bo'ladi
# Relay worker — outbox → Kafka
async def outbox_relay():
while True:
async with db.transaction():
rows = await db.fetch_all("""
SELECT * FROM outbox
WHERE published_at IS NULL
ORDER BY occurred_at
LIMIT 100
FOR UPDATE SKIP LOCKED
""")
for row in rows:
try:
await kafka.send(
topic=f"{row.aggregate_type}.events",
value=row.payload,
key=str(row.aggregate_id),
)
await db.execute(
"UPDATE outbox SET published_at = NOW() WHERE id = $1",
row.id
)
except Exception as e:
await db.execute(
"UPDATE outbox SET retry_count = retry_count + 1 WHERE id = $1",
row.id
)
await asyncio.sleep(1)
# Alternative: Debezium (CDC) — PostgreSQL WAL'dan to'g'ridan to'g'ri o'qib Kafka'ga
✓
Idempotency keys — must have
Har event uchun unique event_id. Consumer tomonda "already processed?" check. Bu event'ni 2 marta olish (at-least-once delivery) — normal; 2 marta ishlatish — bug.
07
real-time systems
7.1
websocket — bidirectional real-time
Chat, live notifications, collaborative editing — qanday qilinadi?
javob
WebSocket vs polling vs SSE — tanlash
Texnika
Direction
Overhead
Use case
Short polling
Client → Server
Juda yuqori
Eng oddiy case
Long polling
Client → Server
Yuqori
Legacy fallback
SSE
Server → Client only
Past
Notifications, LLM streaming
WebSocket
Bidirectional
Eng past
Chat, collab, games
WebRTC
P2P
Past (after setup)
Video, voice, low-latency
FastAPI WebSocket + Redis pub-sub
ws_chat.py
from fastapi import WebSocket, WebSocketDisconnect
import redis.asyncio as redis
class ConnectionManager:
"""Multi-instance WebSocket via Redis pub-sub"""
def __init__(self):
self.local: dict[str, set[WebSocket]] = {} # room_id -> connections
self.redis = redis.Redis.from_url("redis://redis:6379")
self.pubsub = self.redis.pubsub()
async def connect(self, ws: WebSocket, room_id: str, user_id: str):
await ws.accept()
self.local.setdefault(room_id, set()).add(ws)
# Redis subscription (agar bu birinchi connection shu room'ga)
if len(self.local[room_id]) == 1:
await self.pubsub.subscribe(f"room:{room_id}")
# Join notification (boshqa instance'lar ham eshitadi)
await self.redis.publish(f"room:{room_id}", json.dumps({
"type": "user.joined", "user_id": user_id
}))
async def disconnect(self, ws: WebSocket, room_id: str):
self.local.get(room_id, set()).discard(ws)
if not self.local.get(room_id):
await self.pubsub.unsubscribe(f"room:{room_id}")
async def broadcast_to_room(self, room_id: str, message: dict):
# Redis orqali — hamma instance'ga
await self.redis.publish(f"room:{room_id}", json.dumps(message))
async def redis_listener(self):
"""Redis'dan kelgan message'larni local WebSocket'larga forward"""
async for message in self.pubsub.listen():
if message["type"] != "message":
continue
room_id = message["channel"].decode().split(":")[1]
data = json.loads(message["data"])
for ws in self.local.get(room_id, set()):
try:
await ws.send_json(data)
except Exception:
pass # connection broke
manager = ConnectionManager()
@app.on_event("startup")
async def startup():
asyncio.create_task(manager.redis_listener())
@app.websocket("/ws/room/{room_id}")
async def ws_endpoint(ws: WebSocket, room_id: str):
user = await authenticate_ws(ws) # JWT in query param
await manager.connect(ws, room_id, user.id)
try:
while True:
data = await ws.receive_json()
await manager.broadcast_to_room(room_id, {
"type": "message",
"user_id": user.id,
"content": data["content"],
"timestamp": time.time(),
})
except WebSocketDisconnect:
await manager.disconnect(ws, room_id)
Production considerations
Heartbeat/ping-pong — har 30 sekundda ping, aks holda dead connection sezilmaydi
Sticky sessions — LB (Load Balancer) bir connection'ni bir server'ga yo'naltiradi
Graceful shutdown — deploy vaqtida connection'lar sekin ko'chadi
7.2
server-sent events (sse) — llm streaming uchun ideal
ChatGPT javobni "yozayotgan"dek ko'rsatadi — bu qanday ishlaydi?
javob
SSE = Server-Sent Events
Server → Client bir tomonlama stream. HTTP/1.1 ustida. WebSocket'dan soddaroq. Auto-reconnect, text-based. LLM response streaming, live notifications, stock tickers uchun — ideal.
llm_sse.py
from fastapi import Request
from fastapi.responses import StreamingResponse
from anthropic import AsyncAnthropic
client = AsyncAnthropic()
@app.post("/api/chat/stream")
async def chat_stream(request: Request, body: ChatRequest):
async def event_stream():
try:
async with client.messages.stream(
model="claude-opus-4-7",
max_tokens=2048,
system=body.system_prompt,
messages=body.messages,
) as stream:
async for text in stream.text_stream:
# Client disconnected?
if await request.is_disconnected():
break
# SSE format: "data: \n\n"
yield f"data: {json.dumps({'type': 'token', 'text': text})}\n\n"
# Final message
final = await stream.get_final_message()
yield f"data: {json.dumps({'type': 'done', 'usage': final.usage.dict()})}\n\n"
except Exception as e:
yield f"data: {json.dumps({'type': 'error', 'message': str(e)})}\n\n"
return StreamingResponse(
event_stream(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # nginx buffering o'chirish
}
)
client.js
// Browser tomon — EventSource API
const eventSource = new EventSource('/api/chat/stream?q=hello');
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.type === 'token') {
appendToUI(data.text); // tokenlarni UI'da ko'rsatamiz
} else if (data.type === 'done') {
eventSource.close();
console.log('Usage:', data.usage);
}
};
eventSource.onerror = () => {
// Browser avtomatik reconnect qiladi
console.log('SSE error, retrying...');
};
SSE vs WebSocket tanlash
SSE — qachon
Faqat server → client kerak
LLM streaming
Live notifications
Stock price, sports scores
Auto-reconnect kerak
WebSocket — qachon
Bidirectional kerak (chat)
Low latency critical (games)
Binary data (files)
Collaborative editing
Part III
infrastructure & platform
Python mastery, containers, Kubernetes, cloud, observability. Code yozish — 30%. Qolgan 70% — uni ishga tushirish, monitor qilish, xavfsiz saqlash. 2026-da DevOps — backend'ning ajralmas qismi.
08
python & fastapi mastery
8.1
async/await — event loop mexanizmi
async/await qanday ishlaydi? GIL, threads, processes — qachon qaysi?
javob
Async nima va nima uchun kerak? — Oshpaz analogiyasi
Tasavvur qiling: oshpaz bitta ovqat pishirayapti — 20 daqiqa kutadi, hech narsa qilmaydi. Bu synchronous. Aqlli oshpaz esa: pasta qo'ydi, vaqt o'tib sos tayyorlar, keyin non qesim qiladi, soat chaliganda pastani tekshiradi — bitta oshpaz, bir vaqtda ko'p ish. Bu async.
Python'da: await db.query() — "DB javob berguncha boshqa task'larni bajara beraman". Event loop bitta thread'da yuzlab network request'ni parallel boshqaradi. I/O (network, DB, fayl) kutishi kerak bo'lganda boshqasiga o'tadi. CPU hisoblash (AI model, compression) uchun esa async yetmaydi — multiprocessing kerak.
GIL — Python'ning eng ko'p tushuntirilmaydigan muammosi
Python'da GIL (Global Interpreter Lock) bir vaqtda faqat bitta thread Python kodi bajarishiga ruxsat beradi. Ya'ni threading.Thread bilan CPU hisoblashni parallel qilolmaysiz — thread'lar navbat bilan ishlaydi. Yechim: multiprocessing — har process alohida Python interpreter (alohida GIL). Yoki ProcessPoolExecutor bilan FastAPI'da CPU task'larni pool'ga yuborish. Async I/O ga GIL ta'sir qilmaydi — await vaqtida GIL boshqasiga beriladi.
Event loop — single-thread concurrency
Event loop tushunchasi
Bitta thread. Event loop — cheksiz cycle. "Hozir kim ishga tayyor?" deb so'raydi. await — "men hozir kutaman, boshqa korutin ishlasin". Shuning uchun 1000 ta network request bitta thread'da parallel ishlay oladi.
async vs threading vs multiprocessing
─────────────────────────────────────────
I/O bound (network, DB, file):
async > threading > multiprocessing
1000+ concurrent: async only
CPU bound (math, ML, image processing):
multiprocessing > threading ≈ async
GIL sababli threading CPU'ni bir vaqtda ishlata olmaydi
Mixed (web server):
async (I/O) + process pool (CPU tasks)
uvicorn --workers 4 → 4 process, har birida async loop
async_patterns.py
import asyncio
import aiohttp
from asyncio import Semaphore, TaskGroup
# Pattern 1 — parallel requests with gather
async def fetch_all_fast(urls: list[str]) -> list[dict]:
async with aiohttp.ClientSession() as session:
tasks = [session.get(url) for url in urls]
responses = await asyncio.gather(*tasks, return_exceptions=True)
return [r for r in responses if not isinstance(r, Exception)]
# Pattern 2 — concurrency limit with Semaphore
async def fetch_with_limit(urls: list[str], limit: int = 10):
sem = Semaphore(limit)
async def bounded_fetch(url):
async with sem: # Max 10 parallel
async with aiohttp.ClientSession() as session:
async with session.get(url) as r:
return await r.json()
return await asyncio.gather(*[bounded_fetch(url) for url in urls])
# Pattern 3 — TaskGroup (Python 3.11+) — structured concurrency
async def process_user_data(user_id: str):
async with TaskGroup() as tg:
profile_task = tg.create_task(fetch_profile(user_id))
orders_task = tg.create_task(fetch_orders(user_id))
prefs_task = tg.create_task(fetch_preferences(user_id))
# Bu yerga yetganda — hammasi yakunlangan
# Birortasi xato bersa — TaskGroup hammasini cancel qiladi
return {
"profile": profile_task.result(),
"orders": orders_task.result(),
"preferences": prefs_task.result(),
}
# Pattern 4 — async + process pool (CPU-bound)
from concurrent.futures import ProcessPoolExecutor
process_pool = ProcessPoolExecutor(max_workers=4)
async def process_image_async(image_path: str):
loop = asyncio.get_event_loop()
# CPU-bound ishni process pool'ga tashlaymiz
result = await loop.run_in_executor(
process_pool,
heavy_image_processing,
image_path
)
return result
# Pattern 5 — timeout bilan
async def fetch_with_timeout(url: str):
try:
async with asyncio.timeout(5): # Python 3.11+
async with aiohttp.ClientSession() as s:
async with s.get(url) as r:
return await r.json()
except asyncio.TimeoutError:
return None
Async anti-patterns — qilmaslik kerak
anti_patterns.py
# ✗ ANTI-PATTERN 1: blocking call async funcda
async def bad_1():
response = requests.get("https://api.example.com") # BLOCKS EVENT LOOP!
return response.json()
# ✓ To'g'ri
async def good_1():
async with aiohttp.ClientSession() as s:
async with s.get("https://api.example.com") as r:
return await r.json()
# ✗ ANTI-PATTERN 2: sync loop async'da
async def bad_2(users):
results = []
for user in users:
r = await fetch_user(user) # sequential!
results.append(r)
return results
# ✓ To'g'ri — parallel
async def good_2(users):
return await asyncio.gather(*[fetch_user(u) for u in users])
# ✗ ANTI-PATTERN 3: CPU-bound async'da
async def bad_3(data):
return heavy_computation(data) # event loop bloklandi
# ✓ To'g'ri
async def good_3(data):
return await loop.run_in_executor(process_pool, heavy_computation, data)
# ✗ ANTI-PATTERN 4: fire-and-forget without tracking
async def bad_4():
asyncio.create_task(send_email(...)) # reference saqlanmadi — GC yeyishi mumkin!
# ✓ To'g'ri
bg_tasks = set()
async def good_4():
task = asyncio.create_task(send_email(...))
bg_tasks.add(task)
task.add_done_callback(bg_tasks.discard)
8.2
pydantic v2 & modern typing
Pydantic nima va u bilan qanday professional ishlash?
javob
Pydantic v2 — production patterns
models.py
from pydantic import BaseModel, Field, field_validator, model_validator, EmailStr
from pydantic import ConfigDict
from typing import Annotated, Literal
from decimal import Decimal
from datetime import datetime
from uuid import UUID
class OrderItem(BaseModel):
model_config = ConfigDict(
frozen=True, # immutable
str_strip_whitespace=True,
extra="forbid", # noma'lum field → error
)
product_id: UUID
quantity: Annotated[int, Field(gt=0, le=1000)]
unit_price: Annotated[Decimal, Field(gt=0, decimal_places=2)]
class OrderCreate(BaseModel):
model_config = ConfigDict(str_strip_whitespace=True, extra="forbid")
user_id: UUID
items: Annotated[list[OrderItem], Field(min_length=1, max_length=100)]
shipping_address: str
currency: Literal["USD", "EUR", "GBP", "UZS"] = "USD"
notes: str | None = Field(default=None, max_length=500)
@field_validator("shipping_address")
@classmethod
def validate_address(cls, v: str) -> str:
if len(v.split(",")) < 3:
raise ValueError("Format: street, city, country")
return v
@model_validator(mode="after")
def validate_total(self) -> "OrderCreate":
total = sum(i.quantity * i.unit_price for i in self.items)
if total > Decimal("100000"):
raise ValueError("Order exceeds 100k — manual approval needed")
return self
# Environment config — pydantic-settings
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
model_config = SettingsConfigDict(env_file=".env", env_prefix="APP_")
database_url: str
redis_url: str = "redis://localhost:6379"
openai_api_key: str
secret_key: str = Field(min_length=32)
debug: bool = False
allowed_origins: list[str] = []
@field_validator("database_url")
@classmethod
def validate_db(cls, v: str) -> str:
if not v.startswith(("postgresql://", "postgresql+asyncpg://")):
raise ValueError("Must be PostgreSQL URL")
return v
settings = Settings() # avtomatik .env'dan o'qiydi
Type hints — zamonaviy Python (3.12+)
typing_modern.py
from typing import TypeVar, ParamSpec, Protocol, Self
from collections.abc import Awaitable, Callable
T = TypeVar("T")
P = ParamSpec("P")
# Generic class
class Repository[T]:
async def find(self, id: UUID) -> T | None: ...
async def save(self, entity: T) -> T: ...
# Protocol — structural typing (duck typing with types)
class Cacheable(Protocol):
def cache_key(self) -> str: ...
def ttl(self) -> int: ...
async def cache_item(item: Cacheable) -> None:
key = item.cache_key()
ttl = item.ttl()
await redis.setex(key, ttl, item.model_dump_json())
# Self type (3.11+)
class QueryBuilder:
def where(self, **kwargs) -> Self:
self._filters.update(kwargs)
return self # typed as Self, so chaining works
def order_by(self, field: str) -> Self:
self._order = field
return self
# Decorator with ParamSpec
def with_retry[T, **P](max_attempts: int = 3):
def decorator(fn: Callable[P, Awaitable[T]]) -> Callable[P, Awaitable[T]]:
async def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
for attempt in range(max_attempts):
try:
return await fn(*args, **kwargs)
except Exception:
if attempt == max_attempts - 1:
raise
await asyncio.sleep(2 ** attempt)
return wrapper
return decorator
8.3
fastapi — zero to production mastery
FastAPI'ni chuqur o'rganib, production-ready tizim qurish — poydevordan to professional darajagacha.
javob
FastAPI — bu nima va u qanday ishlaydi?
FastAPI — Python'da web API yaratish uchun zamonaviy framework. Uning "sehri" uch asosiy ustunga tayanadi:
1. Starlette (ASGI) — async HTTP server framework. Django WSGI'dan farqli: bir vaqtda minglab connection'larni bitta thread'da boshqaradi (event loop). WebSocket, SSE, background task — hammasi native.
2. Pydantic — Python type hint'larini runtime validation'ga aylantiradi. Siz user_id: int deb yozasiz — FastAPI avtomatik: (a) request'dan oladi, (b) int'ga convert qiladi, (c) noto'g'ri bo'lsa 422 qaytaradi, (d) OpenAPI schema'ga qo'shadi. Bu — boshqa framework'larda qo'lda yozilishi kerak bo'lgan yuzlab satr kod.
3. Dependency Injection — har endpoint uchun kerakli narsalarni (DB session, authenticated user, config) "so'rab olish" mexanizmi. Test yozish oson — fake dependency inject qilasiz, haqiqiy DB kerak emas.
Nima uchun Django/Flask o'rniga FastAPI?
Django: full-featured (ORM, admin, auth built-in), lekin sync va og'ir. AI backend uchun juda ko'p "qadoq" ortiqcha.
Flask: yengil, lekin har narsani qo'lda qilish kerak — validation, serialization, async handling, OpenAPI docs. 2015-yilda yaxshi edi, hozir eskirgan.
FastAPI: Flask'ning oddiyligi + Django'ning ishonchliligi + 2020-yillar zamonaviy Python (type hints, async). Benchmark'larda Flask'dan 3-5x tez. OpenAPI docs avtomatik. LLM integratsiyasi uchun eng yaxshi tanlov.
Loyiha strukturasi — production standard
Yaxshi loyiha strukturasi kelajakda juda ko'p muammoni oldini oladi. Quyidagi struktura katta production loyihalarda sinovdan o'tgan — Netflix, Instagram, Beeline backend'larida shunga o'xshash yondashuv ishlatiladi. Har qatlam aniq javobgarlikka ega:
Modular arxitektura — har qatlam nima qiladi?
Yaxshi arxitektura "har narsa o'z joyida" tamoyiliga asoslanadi. Kodingizning har bir qismi aniq javobgarlikka ega bo'lishi, boshqa qismga ortiqcha bog'liq bo'lmasligi kerak. Bu Clean Architecture va Hexagonal Architecture tamoyillari hosilasi.
Nima uchun layerlash muhim?
1-misol: "Order yaratilganda email yuborish" qoidasini o'zgartirish kerak. Layerlanmagan kodda: orders.py, payments.py, api/admin.py — barcha joyda email yuborish kodini topib o'zgartirish kerak. Layerlangan kodda: faqat services/order_service.py — bitta joy, bitta o'zgarish.
2-misol: PostgreSQL'dan MongoDB'ga migratsiya kerak. Layerlanmagan: butun kodni qayta yozish. Layerlangan: faqat repositories/ qatlamini yangidan yozasiz — service va API qatlamlari tegmaydi.
3-misol: Test yozish. Layerlanmagan: har test uchun real DB, real email server ulash kerak. Layerlangan: fake repository inject qilasiz — bir soniyada 1000 ta test.
FastAPI request-to-response — to'liq hayot sikli
Tasavvur qiling: foydalanuvchi POST /api/v1/orders so'rovini yubordi. Bitta "klik" ortida 14 ta alohida qadam bajariladi — har biri o'z ishi bor. Quyidagi diagrammada har bir komponent nima qilishi va qayerda nima vaqt ketishi ko'rsatilgan:
Onion model — middleware qanday ishlaydi?
Middleware'lar piyoz qatlamlari kabi ishlaydi. So'rov tashqi qatlamdan kirib, ichki yadroga yetadi — har qatlam kirishda bir narsa qiladi (masalan, auth verify). Javob qaytishda esa har qatlam chiqishda boshqa narsa qiladi (masalan, response header qo'shish). Shuning uchun CORS oxirgi javobga ham ta'sir qiladi, birinchi request'ga ham.
Middleware tartibi muhim: CORS → RequestID → RateLimit → Auth → Handler. Agar RateLimit oldin bo'lsa — authsiz odamlar ham hisobga olinadi. Agar Auth oldin bo'lsa — yuzta bot ham DB'ga murojaat qiladi keyin cheklanadi.
Kod misoli — to'liq modular API
Endi yuqoridagi arxitekturani amalda ko'ramiz. "Order yaratish" endpoint'ini 4 qatlamda yozamiz — kichik qismlarga bo'lib, har biri nima qilishini aniq tushunish uchun.
1. Application entry point — app/main.py
Bu — ilovaning "kirish eshigi". FastAPI instance yaratiladi, resurslar (DB, Redis, Kafka) ishga tushirilib, middleware'lar ulanadi. Avval eng muhim qismi — lifespan funksiyasi:
app/main.py — lifespan (resurs boshqaruvi)
from contextlib import asynccontextmanager
from fastapi import FastAPI
from app.config import settings
from app.database import create_db_pool
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup: resurslarni bir marta yaratamiz
app.state.db_pool = await create_db_pool(settings.DATABASE_URL)
app.state.redis = await create_redis_pool(settings.REDIS_URL)
app.state.kafka = await create_kafka_producer(settings.KAFKA_URL)
yield # ← Shu yerda ilova ishlaydi
# Shutdown: resurslarni toza yopamiz
await app.state.kafka.stop()
await app.state.redis.close()
await app.state.db_pool.close()
Nima qilyapmiz?lifespan — maxsus async context manager. yield belgisi gacha bo'lgan kod app ishga tushganda bir marta ishlaydi, yield'dan keyingi kod — app to'xtaganda (graceful shutdown). Bu muhim, chunki connection pool'larni bir marta yaratib, har request'da qayta yaratmaymiz. app.state — FastAPI'ning global saqlash joyi, ichidagi har narsaga middleware va endpoint'lardan kirish mumkin.
Keyin FastAPI obyektini yaratamiz va middleware'larni qo'shamiz. Tartib muhim:
app/main.py — middleware stack
from fastapi.middleware.cors import CORSMiddleware
from app.middleware import RequestIDMiddleware, LoggingMiddleware
from app.api.v1.router import api_router
app = FastAPI(
title=settings.APP_NAME,
version=settings.VERSION,
lifespan=lifespan,
docs_url="/docs" if settings.DEBUG else None,
)
# Tartib: oxirgi qo'shilgan — birinchi ishga tushadi (onion model)
app.add_middleware(RequestIDMiddleware) # eng ichki
app.add_middleware(LoggingMiddleware)
app.add_middleware(CORSMiddleware, # eng tashqi
allow_origins=settings.ALLOWED_ORIGINS,
allow_credentials=True,
allow_methods=["*"],
)
app.include_router(api_router, prefix="/api/v1")
Piyoz (onion) tamoyili: FastAPI middleware'larni teskari tartibda qo'llaydi. Birinchi kirgan — oxirgi ishga tushadi. CORS eng oxirida qo'shilgani uchun — birinchi bo'lib ishga tushadi (yaxshi, chunki CORS header'sini so'rov kirishidanoq tekshirish kerak). Request ID eng oxirgi ichki qatlam — har so'rov unga unique ID beradi va keyin log'larda shu ID bo'yicha kuzatish mumkin.
2. API qatlami — app/api/v1/orders.py
Bu qatlam faqat HTTP bilan gaplashadi. Biznes logika bu yerda yo'q. Handler maksimal yupqa bo'lishi kerak — faqat request'dan data'ni olish, service'ni chaqirish, response qaytarish:
app/api/v1/orders.py — create endpoint
from fastapi import APIRouter, Depends, status
from app.models.schemas import OrderCreate, OrderResponse
from app.services.order_service import OrderService
from app.dependencies import get_order_service, get_current_user
from app.models.domain import User
router = APIRouter(prefix="/orders", tags=["orders"])
@router.post("", response_model=OrderResponse, status_code=201)
async def create_order(
data: OrderCreate,
current_user: User = Depends(get_current_user),
service: OrderService = Depends(get_order_service),
) -> OrderResponse:
order = await service.create_order(data=data, user=current_user)
return OrderResponse.from_domain(order)
Diqqat qiling: handler faqat 2 qatorlik ish qiladi — service'ni chaqiradi va response qaytaradi. Qolgan hammasini FastAPI o'zi qiladi: data: OrderCreate — request body'ni Pydantic bilan avtomatik validate qiladi; Depends(get_current_user) — JWT token'ni tekshirib user'ni DB'dan topib beradi; response_model=OrderResponse — javobni Pydantic schema bo'yicha serialize qiladi. Siz yozmagan, lekin hammasi ishlaydi.
Ro'yxat endpointi biroz murakkabroq — pagination kerak:
app/api/v1/orders.py — list endpoint
@router.get("", response_model=OrderListResponse)
async def list_orders(
pagination: PaginationParams = Depends(get_pagination),
current_user: User = Depends(get_current_user),
service: OrderService = Depends(get_order_service),
) -> OrderListResponse:
orders, total = await service.list_user_orders(
user_id=current_user.id,
limit=pagination.limit,
offset=pagination.offset,
)
return OrderListResponse(
data=[OrderResponse.from_domain(o) for o in orders],
total=total,
page=pagination.page,
)
Pagination ham Depends orqali keladi — ?page=2&per_page=20 query parametrlarni olib, validate qilib, tayyor obyekt beradi. Bu pattern har endpoint'da qayta ishlatiladi — bir marta yozib, hamma joyda kerak bo'lganda Depends(get_pagination).
3. Service qatlami — app/services/order_service.py
Bu qatlam — ilovaning yuragi. Butun biznes logika shu yerda jamlanadi. HTTP haqida hech narsa bilmaydi — agar ertaga API'dan voz kechib CLI yoki GraphQL ga o'tsak, service tegmaydi. Avval konstruktor:
order_service.py — dependency injection
from app.repositories.order_repo import OrderRepository
from app.repositories.product_repo import ProductRepository
from app.repositories.wallet_repo import WalletRepository
from app.services.event_bus import EventBus
from app.models.domain import Order, User, OrderStatus
from app.models.schemas import OrderCreate
from app.exceptions import InsufficientStockError, InsufficientFundsError
class OrderService:
def __init__(
self,
order_repo: OrderRepository,
product_repo: ProductRepository,
wallet_repo: WalletRepository,
event_bus: EventBus,
):
self.orders = order_repo
self.products = product_repo
self.wallets = wallet_repo
self.events = event_bus
Nima uchun shunday? Service hech narsani o'zi yaratmaydi — barcha bog'liqliklar tashqaridan beriladi (Dependency Injection). Bu testing uchun muhim: test vaqtida haqiqiy DB o'rniga FakeOrderRepository inject qilish mumkin. Bundan tashqari, agar ertaga PostgreSQL'dan MongoDB'ga o'tsak, faqat repository'ni almashtirish kifoya — service qonida o'zgarish bo'lmaydi.
Keyin asosiy metod — buyurtma yaratish. Qadamlarga bo'lib ko'ramiz. Birinchi, mahsulotni tekshirish:
order_service.py — 1. validation
async def create_order(self, data: OrderCreate, user: User) -> Order:
# 1. Mahsulot bor-yo'qligini tekshirish
product = await self.products.get_by_id(data.product_id)
if not product:
raise NotFoundError(f"Product {data.product_id} not found")
# 2. Omborda yetarli ekanligini tekshirish
if product.stock < data.quantity:
raise InsufficientStockError(
f"Only {product.stock} available, requested {data.quantity}"
)
Birinchi qadam — ma'lumotlar to'g'riligini tekshirish. Mahsulot bormi? Ombor yetarlimi? Agar biror tekshiruvdan o'tmasa — domain exception otiladi. Bu exception'lar keyinchalik API qatlamida 404 yoki 409 response'ga aylantiriladi, lekin service o'zi HTTP status code haqida hech narsa bilmaydi.
Ikkinchi qadam — biznes qoidalarini qo'llash (narx va chegirma):
order_service.py — 2. pricing
# 3. Narx va chegirma hisoblash
total = product.price * data.quantity
if user.is_premium:
total *= Decimal("0.9") # Premium foydalanuvchilarga 10% chegirma
# 4. Hamyondagi pul yetarli ekanligini tekshirish
wallet = await self.wallets.get_by_user(user.id)
if wallet.balance < total:
raise InsufficientFundsError(
f"Balance: {wallet.balance}, required: {total}"
)
Diqqat qiling: chegirma qoidasi (premium users get 10% off) faqat shu yerda. API handlerda yo'q, controllerda yo'q, repository'da yo'q. Agar ertaga chegirma 15% bo'lsa yoki yangi tier qo'shilsa — faqat shu joyni o'zgartiramiz. Bu — single source of truth tamoyili.
Uchinchi qadam — saqlash, lekin eng muhim jihati bilan: transaction ichida. Bu yerda atomicity muhim — yoki hammasi saqlanadi, yoki hech biri:
order_service.py — 3. atomic transaction
# 5. Transaction ichida saqlash (ATOMIC!)
async with self.orders.transaction():
order = await self.orders.create(
user_id=user.id,
product_id=data.product_id,
quantity=data.quantity,
total=total,
status=OrderStatus.PENDING,
)
await self.products.decrement_stock(data.product_id, data.quantity)
await self.wallets.debit(user.id, total)
Nima uchun transaction muhim? Tasavvur qiling: buyurtma yaratildi, stok kamaytirildi, lekin hamyondan pul olish vaqtida xato chiqdi. Transactionsiz: buyurtma bor, stok kamaygan, lekin pul o'tmagan — mijoz tekin mahsulot oladi. Transaction bilan: yoki uch amal ham muvaffaqiyatli bajariladi, yoki bironi ham bajarilmaydi. Bu — ACID'ning Atomicity xossasi.
Nega event kerak? Buyurtma yaratilishi natijasida ko'p narsa bo'lishi kerak: mijozga email, omborxonaga notifikatsiya, ML model'ga data, analytics'ga metric. Agar hammasini shu metod ichida sinxron qilsak — 5 sekund kutadi foydalanuvchi. Event bilan: service faqat "buyurtma yaratildi" deb e'lon qiladi, qolganlar o'z vaqtida, o'z tezligida bajaradi. Bu — event-driven architecture.
Bu qatlamning yagona vazifasi — DB bilan ishlash. Hech qanday biznes logika yo'q. Faqat SQL query, transaction, caching. Agar service "buyurtmani saqla" desa — repository aynan saqlaydi, nima uchunligini so'ramaydi. Avval sinf:
order_repo.py — struktura
from contextlib import asynccontextmanager
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, update, func
from app.models.domain import Order, OrderStatus
class OrderRepository:
def __init__(self, session: AsyncSession):
self.session = session
@asynccontextmanager
async def transaction(self):
"""Atomic operation uchun transaction wrapper"""
async with self.session.begin():
yield
Repository SQLAlchemy session ni oladi. Session — bu bitta request doirasidagi DB context. U orqali SELECT, INSERT, UPDATE qilinadi. transaction() metodi — context manager. Uni service'ning async with bloki ichida ishlatganda, ichidagi barcha amallar bir transaction'da bajariladi.
Yangi buyurtma saqlash metodi:
order_repo.py — create
async def create(
self,
user_id: UUID,
product_id: UUID,
quantity: int,
total: Decimal,
status: OrderStatus,
) -> Order:
order = Order(
user_id=user_id,
product_id=product_id,
quantity=quantity,
total=total,
status=status,
)
self.session.add(order)
await self.session.flush() # INSERT execute, lekin commit emas
await self.session.refresh(order) # DB'dan id, created_at ni olish
return order
flush() va commit() farqini bilish muhim: flush — SQL'ni DB'ga yuboradi, lekin transactionni yopmaydi (id, timestamp default qiymatlari tayyor bo'ladi, lekin rollback qilish hali mumkin). Commit — transactionni yakunlaydi. Biz service'dagi async with transaction() bloki tugaganda — o'shanda avtomatik commit bo'ladi. Bu yondashuv: birinchi flush, tekshiruv, keyin commit — xavfsizroq.
Ro'yxat olish metodi — ikki query'ni parallel bajarish bilan:
order_repo.py — list with pagination
async def list_by_user(
self, user_id: UUID, limit: int = 20, offset: int = 0,
) -> tuple[list[Order], int]:
# Data query
data_query = (
select(Order)
.where(Order.user_id == user_id)
.order_by(Order.created_at.desc())
.limit(limit).offset(offset)
)
# Count query (pagination uchun total kerak)
count_query = (
select(func.count(Order.id))
.where(Order.user_id == user_id)
)
# PARALLEL execute — ikkala query bir vaqtda
results = await asyncio.gather(
self.session.execute(data_query),
self.session.execute(count_query),
)
orders = list(results[0].scalars().all())
total = results[1].scalar_one()
return orders, total
Nima uchun parallel? Ikki alohida query: birinchisi 20 ta buyurtma oladi, ikkinchisi umumiy sonni. Agar ularni ketma-ket qilsak — 2x vaqt ketadi. asyncio.gather ikkalasini bir vaqtda yuboradi — DB ikkisini parallel bajaradi. Bu — async Python'ning asosiy kuchi. Sync kod bilan bu optimizatsiyani qilish mumkin emas edi.
5. Dependency graph — app/dependencies.py
FastAPI'ning eng kuchli xususiyati — avtomatik dependency resolution. Siz shunchaki "menga OrderService kerak" deysiz, FastAPI o'zi: avval sessionni yaratadi, undan OrderRepository, undan va ProductRepository, WalletRepository, EventBus'dan OrderService'ni qurib chiqaradi. Graph'ni avtomatik bilasu.
Birinchi — eng asosiy dependency, DB session:
dependencies.py — DB session
from fastapi import Depends, Request
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import SessionLocal
async def get_db() -> AsyncSession:
async with SessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
Bu generator funksiyasi (yield ishlatilgan, return emas). yield sessiongacha bo'lgan qism request boshlanganda ishlaydi — session yaratiladi. yielddan keyingi qism request tugagach ishlaydi — agar xato bo'lmagan bo'lsa commit, xato bo'lsa rollback. Bu pattern har endpoint uchun o'z-o'zidan transaction management beradi.
Foydalanuvchini JWT'dan olish:
dependencies.py — authenticated user
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi import HTTPException
import jwt
from app.config import settings
security = HTTPBearer()
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: AsyncSession = Depends(get_db),
) -> User:
try:
payload = jwt.decode(
credentials.credentials,
settings.JWT_SECRET,
algorithms=[settings.JWT_ALGORITHM],
)
except jwt.ExpiredSignatureError:
raise HTTPException(401, "Token expired")
except jwt.InvalidTokenError:
raise HTTPException(401, "Invalid token")
user = await db.get(User, UUID(payload["sub"]))
if not user:
raise HTTPException(401, "User not found")
return user
Bu funksiya ikkita narsaga bog'liq:HTTPBearer (Authorization header'dan token'ni oladi) va get_db (DB'dan user'ni topish uchun). FastAPI buni avtomatik bilsin deb yozildi — qachon endpoint Depends(get_current_user) so'rasa, FastAPI avval DB session yaratadi, keyin token'ni o'qiydi, validate qiladi va user'ni DB'dan topib beradi.
Service'larning o'zi — bir necha repository'dan yasaladi:
Diqqat: uchchala repository bir xil session'ni olishadi. FastAPI bilan: bir request uchun get_db() bir marta chaqiriladi, natija cache'lanadi va har joyda bir xil session ishlatiladi. Bu muhim — transaction bitta connection'da bo'lishi kerak. Agar har repository alohida session olsa — transaction ishlamaydi.
6. Pydantic schemas — app/models/schemas.py
Schema'lar — API kontrakti. Ular ikkiga bo'linadi: input (client → server) va output (server → client). DB model'dan alohida saqlash — qattiq qoida:
schemas.py — Input
from pydantic import BaseModel, Field
from uuid import UUID
class OrderCreate(BaseModel):
"""Client'dan kelayotgan data — faqat kerakli fieldlar"""
product_id: UUID
quantity: int = Field(..., ge=1, le=100, description="1-100 oralig'ida")
E'tibor bering:OrderCreate'da user_id yo'q. Xavfsizlik uchun: user o'z ID'sini yubora olmaydi — biz uni JWT'dan olamiz. Agar user_id field'da bo'lsa, hacker boshqa user nomidan buyurtma yaratishi mumkin. total ham yo'q — uni server o'zi hisoblaydi (premium chegirma va h.k.). Client faqat o'zi bilishi kerak bo'lgan narsalarni yuboradi.
schemas.py — Output
from pydantic import ConfigDict
from datetime import datetime
from decimal import Decimal
from typing import Self
from app.models.domain import Order, OrderStatus
class OrderResponse(BaseModel):
"""Client'ga qaytariladigan data"""
model_config = ConfigDict(from_attributes=True)
id: UUID
product_id: UUID
quantity: int
total: Decimal
status: OrderStatus
created_at: datetime
# user_id qaytarilmaydi — user o'ziniki ekanini biladi
# internal_notes qaytarilmaydi — bu admin uchun
@classmethod
def from_domain(cls, order: Order) -> Self:
return cls.model_validate(order)
from_attributes=True — Pydantic'ga aytadi: "ORM obyekt'idan atributlarni o'zingiz oling". Ya'ni Order ORM obyektidan OrderResponse'ni model_validate bilan yaratish mumkin. DB'da password_hash, internal_notes field'lar bo'lsa ham — ular bu yerda yo'q, shuning uchun javobga tushmaydi. Bu — eng muhim xavfsizlik tamoyili: qachon siz foydalanuvchiga data qaytarsangiz, har field'ni alohida e'tirof etgan bo'ling, aks holda yashirin ma'lumotlar tasodifan chiqib ketishi mumkin.
7. Testing — unit test mock bilan
Modular arxitekturaning eng katta mukofoti — tez va ishonchli testing. Service qatlamini real DB'siz, minglab marta sekundida test qilamiz. Avval fixture:
test_order_service.py — setup
import pytest
from decimal import Decimal
from unittest.mock import AsyncMock
from app.services.order_service import OrderService
from app.models.schemas import OrderCreate
from app.models.domain import Product, User, Wallet
from app.exceptions import InsufficientStockError
@pytest.fixture
def mock_dependencies():
"""Barcha dependencies — fake obyektlar"""
return {
"order_repo": AsyncMock(),
"product_repo": AsyncMock(),
"wallet_repo": AsyncMock(),
"event_bus": AsyncMock(),
}
@pytest.fixture
def service(mock_dependencies):
return OrderService(**mock_dependencies)
AsyncMock — barcha metodlari async bo'lgan fake obyekt. Har metodni chaqirganda — uning nima qaytarishini biz belgilaymiz. Bu real DB o'rniga ishlaydi: bir millisekundda javob beradi, diskka hech narsa yozmaydi, testlar bir-biriga ta'sir qilmaydi.
AAA pattern (Arrange-Act-Assert) — test yozishning klassik yo'li. Arrange: fake obyektlar va ularning javoblarini tayyorlaymiz. Act: sinab ko'rmoqchi bo'lgan metodni chaqiramiz. Assert: natija va qo'shimcha qilingan chaqiruvlarni tekshiramiz. assert_called_with — "bu metod aynan shu parametrlar bilan chaqirilganmi?" — integratsiyani tekshiradi.
Ikkinchi test — exception scenariy:
test — stock yetmaganda
async def test_raises_if_stock_insufficient(service, mock_dependencies):
mock_dependencies["product_repo"].get_by_id.return_value = Product(
id="p1", price=Decimal("100"), stock=2 # faqat 2 dona bor
)
# Quantity 5 — stock'dan ko'p
with pytest.raises(InsufficientStockError):
await service.create_order(
OrderCreate(product_id="p1", quantity=5),
User(id="u1", is_premium=False),
)
Bu test bizning qoidamizni himoya qiladi: "stokdan ko'p buyurtma qabul qilinmaydi". Agar kelajakda kimdir qoidani noto'g'ri o'zgartirib yuborsa — test chaqiradi va commit qilishga yo'l qo'ymaydi.
Uchinchi test — premium discount qoidasini sinab ko'ramiz:
Bitta test bitta xulosa tamoyili: har test faqat bir biznes qoidani tekshiradi. "Premium user 10% chegirma oladi" — alohida test. "Stok yetmasa exception" — alohida test. Shunday qilib, test fail bo'lsa — aniq qaysi qoida buzilganini darhol bilasiz.
✓
FastAPI mastery — asosiy tamoyillar
1. Handler yupqa bo'lsin. API layer'da biznes logika yo'q — faqat service chaqirish. Agar handler 20 qatordan oshsa — service'ga ko'chirish kerak.
2. Dependency injection keng ishlatilsin. Har resurs (DB, cache, service) Depends() orqali keladi — testing uchun fake qo'yish oson.
3. Pydantic schema = API kontrakt. DB model'dan alohida. Input va output schema'lar alohida bo'lishi ham mumkin.
4. Exception'lar domain'da tug'iladi. Service InsufficientStockError raise qiladi — API layer'da global exception handler bu'ni 409 response'ga aylantiradi.
5. Test unit + integration. Unit testlar service'ni fake repo bilan (tez), integration testlar real DB bilan (sekin, lekin real).
09
devops & cloud — docker, kubernetes, ci/cd
9.1
docker — multi-stage, security, optimization
Production-grade Dockerfile qanday yoziladi?
javob
Dockerfile
# syntax=docker/dockerfile:1.7
# Stage 1: builder
FROM python:3.12-slim AS builder
WORKDIR /build
# System deps faqat build uchun
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
&& rm -rf /var/lib/apt/lists/*
# uv — yangi tez package manager (2026 default)
COPY --from=ghcr.io/astral-sh/uv:latest /uv /uv
# Dependencies — alohida layer (cache optimization)
COPY pyproject.toml uv.lock ./
RUN --mount=type=cache,target=/root/.cache/uv \
uv sync --frozen --no-dev --no-install-project
# Stage 2: runtime
FROM python:3.12-slim AS runtime
# Non-root user
RUN groupadd -r app && useradd -r -g app -d /app -s /bin/bash app
WORKDIR /app
# System deps (minimal)
RUN apt-get update && apt-get install -y --no-install-recommends \
libpq5 curl \
&& rm -rf /var/lib/apt/lists/* \
&& apt-get clean
# Dependencies builder'dan ko'chiriladi
COPY --from=builder /build/.venv /app/.venv
ENV PATH="/app/.venv/bin:$PATH"
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
# App code
COPY --chown=app:app . .
USER app
EXPOSE 8000
HEALTHCHECK --interval=30s --timeout=5s --start-period=30s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "2"]
Docker optimization tips
Multi-stage build — runtime image ga faqat kerakli. 500 MB → 150 MB.
Kubernetes (K8s) — container'larni boshqarish tizimi. Tasavvur qiling: 50 server, 200 container, har biri har xil CPU/memory kerak, traffic birdaniga 10x oshdi, bitta server o'chib qoldi — bularni qo'lda boshqarish mumkin emas. K8s avtomatik bajaradi: container'larni serverlarga joylashtiradi (scheduling), o'lik container'larni qayta ishga tushiradi (self-healing), traffic oshganda yangi nusxa yaratadi (auto-scaling), traffic kamaysa o'chiradi (scale down).
Declarative model: siz "holatni" tasvirlaysiz (YAML'da), K8s shu holatga yetkazadi va saqlab turadi. "3 ta replica bo'lsin" → bitta o'chsa, K8s darhol yangi birini ishga tushiradi.
K8s resource'lar — hierarchy
Pod ← eng kichik birlik, 1+ container
↑
Deployment ← declarative pod management, rolling update
↑
Service ← network endpoint, load balance
↑
Ingress ← external HTTP routing
+ ConfigMap, Secret ← configuration
+ HPA ← auto scaling
+ PodDisruptionBudget ← availability during deployments
+ NetworkPolicy ← network isolation
from opentelemetry import trace
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
# Setup
provider = TracerProvider()
provider.add_span_processor(BatchSpanProcessor(
OTLPSpanExporter(endpoint="http://otel-collector:4317")
))
trace.set_tracer_provider(provider)
# Auto-instrument frameworks
FastAPIInstrumentor.instrument_app(app)
SQLAlchemyInstrumentor().instrument(engine=engine)
HTTPXClientInstrumentor().instrument()
# Custom span'lar
tracer = trace.get_tracer(__name__)
@app.post("/orders")
async def create_order(data: OrderCreate):
with tracer.start_as_current_span("create_order") as span:
span.set_attribute("user.id", str(data.user_id))
span.set_attribute("order.items_count", len(data.items))
with tracer.start_as_current_span("validate_inventory"):
await check_inventory(data.items)
with tracer.start_as_current_span("charge_payment"):
charge = await stripe_charge(data.total)
span.set_attribute("charge.id", charge.id)
with tracer.start_as_current_span("save_order"):
order = await db.save_order(data)
span.set_attribute("order.id", str(order.id))
return order
SRE practices — chaos engineering
✦
Chaos engineering — production standard
Netflix Chaos Monkey'dan boshlangan. Ideya: production'da atayin failure yaratib ko'rish. Region down, DB lag, network partition. Chaos Mesh, Litmus, Gremlin — tools. Purpose: resilience to'liq test qilib bo'lmaydi, lekin uni simulyatsiya qilish mumkin.
Incident response — production runbook
Alert chirilyapti (PagerDuty/Opsgenie):
─────────────────────────────────────────
1. ACKNOWLEDGE (<2 min) — alert'ni qabul
2. ASSESS (<5 min) — severity, scope, user impact
3. COMMUNICATE — status page, Slack #incident
4. MITIGATE — to'xtatish (rollback > fix-forward)
5. VERIFY — metric'lar yashil
6. POSTMORTEM (24-48h) — blameless root cause
Severity levels:
SEV-1: major outage, >25% users → all hands, status page
SEV-2: partial outage, feature broken → on-call team
SEV-3: minor issue, workaround bor → working hours
Part IV
ai architecture — llm, rag, agents, advanced patterns
Zamonaviy AI — passiv chatbot emas, aktiv ish bajaruvchi komponent. LLM fundamentals, barcha RAG arxitekturalari, vector search, fine-tuning, agentic AI, multi-agent systems, MCP, GraphRAG, AI Memory, SLM va MoE — to'liq AI muhandislik bo'limi. Bu qism — butun qo'llanmaning yuragi.
11
llm fundamentals — how language models work
11.1
llm qanday ishlaydi — engineering perspective
LLM black box emas — uni tushunish kerak. Qanday ishlaydi?
javob
LLM — keyingi so'z bashoratchisi, lekin juda aqlli
LLM (Large Language Model) — juda katta (100B+ parametr) transformer neural network. Asosiy vazifa oddiy: berilgan matn (context) keyin qanday token kelishini bashorat qilish. "Paris is the capital of __" → 99.8% ehtimol "France". Bitta token bashorat qilinadi, u context'ga qo'shiladi, keyingi token bashoratlanadi. Bu cycle — "autoregressive generation" deyiladi.
Paradoks: shu oddiy vazifani trilliardlab internet matni bilan o'rganish natijasida, model tasodifan fikrlash, hisob-kitob, kod yozish, tarjima qilish kabi "tushunish" qobiliyatlarini kasb etadi. Shu sababli LLM'lar "emergent capabilities" ko'rsatadi — ular uchun to'g'ridan-to'g'ri o'qitilmagan, lekin bajaraladi.
Context window — LLM'ning "ish xotirasi"
LLM conversation'ni yod bilan saqlamaydi. U faqat joriy context window'dagi matnni "ko'radi" va unga javob beradi. Har yangi API call'da siz butun conversation history'ni (messages array) yuborasiz — LLM uchun har call "yangi boshlanish". 128K token context window ≈ 100K so'z ≈ 300 sahifa kitob — bu katta, lekin shunga qaramay cheklangan. Long conversation → token limit → older messages truncate qilinishi mumkin.
Token — LLM'ning birligi
"Hello, world!" → tokenizer → [15496, 11, 1917, 0]
↑
har token ~3-4 character, yoki bir so'z,
yoki so'z qismi. BPE algoritmi.
1 English word ≈ 1.3 tokens
1 Uzbek word ≈ 2.5 tokens (turk tillari tokenization'ga yomon moslashgan)
1000 words ≈ 1300 tokens (EN) yoki ~2500 tokens (UZ)
Pricing odatda 1M token uchun:
- Input: $3-15
- Output: $15-75 (ancha qimmat!)
Asosiy parametrlar — API call'da
llm_params.py
from anthropic import AsyncAnthropic
client = AsyncAnthropic()
response = await client.messages.create(
model="claude-opus-4-7", # model tanlovi
max_tokens=1024, # response limit (to'xtash uchun)
temperature=0.7, # 0.0 = deterministic, 1.0 = creative
top_p=0.95, # nucleus sampling
system="Sen professional Python dasturchisisan.", # persona
messages=[
{"role": "user", "content": "Fibonacci function yoz."}
],
)
# Key parametrlar:
# temperature — kreativlik darajasi
# 0.0: har safar bir xil javob (code, facts)
# 0.3-0.7: professional ishlar
# 1.0+: kreativ yozish, brainstorm
#
# top_p — probability mass
# 0.9: faqat top 90% ehtimolli token'lar
#
# max_tokens — qancha token generate qilsin
# Kengroq context → qimmat va sekinroq
Context window — modeli xotirasi
Model
Context window
Output max
Notes
GPT-4o
128k tokens
16k
OpenAI
Claude Opus 4.7
200k tokens
8k
Anthropic, reasoning king
Claude Sonnet 4.6
200k
8k
Balanced
Gemini 2.5 Pro
2M tokens
8k
Longest context
Llama 3.3 70B
128k
—
Open-weight, self-host
Mistral Large 2
128k
—
EU, open weights
▲
Context window != context quality
2M context ≠ 2M effective attention. "Lost in the middle" fenomen: model o'rtadagi ma'lumotni yomonroq ishlatadi. 50k kerakli context + RAG retrieval > 2M random text. Context tizim muhim bu bir tezlik/narx kompromissi emas.
Prompt engineering — practical patterns
prompts.py
# PATTERN 1 — Role + Task + Format + Examples
SYSTEM_PROMPT = """Sen senior Python backend dasturchi va code reviewer.
Vazifang:
- Berilgan Python kodni ko'rib chiqish
- Security, performance, best practices muammolarini topish
- Har muammo uchun tuzatilgan versiya berish
Javob formati (JSON):
{
"issues": [
{
"severity": "critical|high|medium|low",
"category": "security|performance|style|bug",
"description": "...",
"line": 15,
"suggested_fix": "..."
}
],
"summary": "umumiy baholash"
}
Misollar:
- SQL injection topsang → severity: "critical", category: "security"
- N+1 query topsang → severity: "high", category: "performance"
"""
# PATTERN 2 — Structured output with Pydantic
from pydantic import BaseModel
class Issue(BaseModel):
severity: Literal["critical", "high", "medium", "low"]
category: Literal["security", "performance", "style", "bug"]
description: str
line: int
suggested_fix: str
class ReviewResult(BaseModel):
issues: list[Issue]
summary: str
# Instructor library bilan (Pydantic validate)
import instructor
client = instructor.from_anthropic(AsyncAnthropic())
result: ReviewResult = await client.messages.create(
model="claude-opus-4-7",
response_model=ReviewResult, # auto-validate
messages=[{"role": "user", "content": code_to_review}]
)
# PATTERN 3 — Chain of Thought
COT_PROMPT = """Muammo: {problem}
Qadam-qadam o'ylang:
1. Masala nima haqida?
2. Qanday ma'lumotlar berilgan?
3. Qanday yondashuvlar mavjud?
4. Har birini qisqacha baholang
5. Eng yaxshi yondashuvni tanlang va bajaring
Xulosa:"""
Function calling — LLM'ning "qo'llari"
function_calling.py
"""Model tool'larni chaqirib real ishlarni bajara oladi"""
tools = [
{
"name": "get_weather",
"description": "Shaharning ob-havo holatini olish",
"input_schema": {
"type": "object",
"properties": {
"city": {"type": "string", "description": "Shahar nomi"},
"units": {"type": "string", "enum": ["celsius", "fahrenheit"]}
},
"required": ["city"]
}
},
{
"name": "search_database",
"description": "Ichki ma'lumotlar bazasida qidirish",
"input_schema": {
"type": "object",
"properties": {
"query": {"type": "string"},
"limit": {"type": "integer", "default": 10}
},
"required": ["query"]
}
}
]
async def chat_with_tools(user_message: str):
messages = [{"role": "user", "content": user_message}]
while True:
response = await client.messages.create(
model="claude-opus-4-7",
max_tokens=1024,
tools=tools,
messages=messages,
)
if response.stop_reason == "end_turn":
return response.content[0].text
if response.stop_reason == "tool_use":
# Model tool chaqirishni so'radi
messages.append({"role": "assistant", "content": response.content})
tool_results = []
for block in response.content:
if block.type == "tool_use":
result = await execute_tool(block.name, block.input)
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": json.dumps(result)
})
messages.append({"role": "user", "content": tool_results})
# Loop — model tool natijasi bilan keyingi qadam
12
rag architectures — naive to agentic
12.1
nima uchun rag va qanday ishlaydi
LLM biladi, lekin sizning ma'lumotlaringizni bilmaydi. Qanday qo'shiladi?
javob
RAG = Retrieval-Augmented Generation — asosiy g'oya
LLM (ChatGPT, Claude, Llama) — bu juda bilimli, lekin o'zgarmas bilim. Ikkita muammo: (1) Knowledge cutoff — 2023-yil aprelidan keyingi yangilikni bilmaydi; (2) Private data — sizning company hujjatlaringizni, DB'ingizni, internal wiki'ngizni bilmaydi. Fine-tuning ham javob emas — har yangi ma'lumotda qayta train qila olmaysiz.
RAG yechimi: savol kelganda — avval tegishli hujjatlarni topib kelamiz, keyin LLM'ga shunday deymiz: "Mana bu hujjatlar asosida javob ber". LLM endi o'z bilimini emas, berilgan context'ni ishlatadi. Natijada: hallucination kamayadi (manbasi bor javob beradi), private data ishlatiladi, real-time yangilanadi (DB yangisi — darhol ko'rinadi).
Vector search qanday ishlaydi? — Matematik asosi
Oddiy database qidiruvdan farqi: SQL'da WHERE text LIKE '%qidiruv%' — faqat so'zma-so'z moslik. "Raqamli bank" so'rasangiz "digital banking" topilmaydi. Vector search esa ma'noní (semantika'ni) tushunadigan matematik o'lchov ishlatadi.
Embedding modeli har matnni 1000+ o'lchamli vektorda (son ro'yxati) tasvirlaydi. O'xshash ma'noli matnlar vektori bir-biriga yaqin bo'ladi. "Raqamli bank" va "digital banking" vektorlari bir-biriga yaqin — cosine similarity 0.93 bo'ladi. Savol vektori topilganda, DB'dagi eng yaqin (similar) vektorlarni qidiramiz — bu k-NN search. HNSW algoritmi bu qidiruvni millionlab vektorlarda ham millisekundlarda qiladi.
Classic RAG pipeline
Naive RAG — eng oddiy implementatsiya
naive_rag.py
from openai import AsyncOpenAI
from qdrant_client import AsyncQdrantClient
openai = AsyncOpenAI()
qdrant = AsyncQdrantClient(url="...")
# ─── INDEXING (offline, once) ───
async def index_document(doc_id: str, content: str, metadata: dict):
# 1. Chunking
chunks = chunk_text(content, chunk_size=500, overlap=50)
# 2. Embedding (batch)
embeddings = await openai.embeddings.create(
model="text-embedding-3-small", # 1536 dim
input=chunks
)
# 3. Store
points = [
{"id": f"{doc_id}_{i}",
"vector": emb.embedding,
"payload": {"doc_id": doc_id, "chunk_index": i, "text": chunk, **metadata}}
for i, (chunk, emb) in enumerate(zip(chunks, embeddings.data))
]
await qdrant.upsert(collection_name="kb", points=points)
# ─── QUERY (real-time) ───
async def answer_question(question: str, tenant_id: str) -> dict:
# 1. Embed question (same model as indexing!)
query_emb = (await openai.embeddings.create(
model="text-embedding-3-small",
input=[question]
)).data[0].embedding
# 2. Retrieve top-k
results = await qdrant.search(
collection_name="kb",
query_vector=query_emb,
query_filter={"must": [{"key": "tenant_id", "match": {"value": tenant_id}}]},
limit=5,
score_threshold=0.7 # past score'larni filter
)
# 3. Build context
context = "\n\n".join([
f"[Source {i+1}] {r.payload['text']}"
for i, r in enumerate(results)
])
# 4. LLM generation
response = await openai.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content":
"Javob bering FAQAT berilgan context asosida. "
"Agar context yetarli emas — 'Ma'lumot yo'q' deb yozing. "
"Har faktdan keyin [Source N] manba ko'rsating."},
{"role": "user", "content": f"Context:\n{context}\n\nSavol: {question}"}
],
temperature=0.1 # faktual javob uchun past
)
return {
"answer": response.choices[0].message.content,
"sources": [{"text": r.payload["text"], "score": r.score, "doc_id": r.payload["doc_id"]}
for r in results]
}
Chunking strategies — to'g'ri bo'lish kalit
Strategiya
Tavsif
Qachon
Fixed size
Har 500 token — kesiladi
Oddiy, ishonchli baseline
Semantic
Paragraph/heading bo'yicha
Structured content (docs, articles)
Sliding window
500 token + 50 overlap
Context continuity muhim
Sentence-aware
Jumla chegarasida
QA use case
Parent-child
Small chunk search → big chunk context
Precision + context balance
Document-aware
Full doc + summary
Kichik corpus, toliq context
12.2
hybrid rag — production baseline
Pure vector search yetarli emas — hybrid nima va nega?
javob
Vector semantic, BM25 keyword
Vector search "ma'no" bo'yicha qidiradi — "Python error" va "Python exception"ni bir deb ko'radi. Lekin ID, model name, unique term'larni yomon topadi — "GPT-4o-mini"ni "GPT-4o"dan ajrata olmaydi. BM25 keyword search bu yerda kuchli. Hybrid = ikkalasi.
Hybrid search implementation
hybrid_rag.py
"""Hybrid = BM25 (keyword) + Vector (semantic) + RRF fusion"""
async def hybrid_search(question: str, k: int = 20) -> list[dict]:
# 1. Vector search — semantic
query_emb = await embed(question)
vector_results = await qdrant.search(
collection_name="kb",
query_vector=query_emb,
limit=k
)
# 2. BM25 search — keyword (PostgreSQL yoki Elasticsearch)
bm25_results = await db.fetch_all("""
SELECT id, chunk, doc_id,
ts_rank_cd(search_vector, query) AS score
FROM knowledge_chunks,
plainto_tsquery('english', $1) query
WHERE search_vector @@ query
ORDER BY score DESC
LIMIT $2
""", question, k)
# 3. Reciprocal Rank Fusion (RRF) — 2 ro'yxatni birlashtirish
scores = {}
RRF_K = 60
for rank, r in enumerate(vector_results):
scores[r.id] = scores.get(r.id, 0) + 1.0 / (RRF_K + rank)
for rank, r in enumerate(bm25_results):
scores[r["id"]] = scores.get(r["id"], 0) + 1.0 / (RRF_K + rank)
# 4. Sort va return
top_ids = sorted(scores.keys(), key=lambda i: scores[i], reverse=True)[:10]
return [await get_chunk(id) for id in top_ids]
async def rag_with_rerank(question: str) -> dict:
# 1. Hybrid retrieval — over-fetch (20)
candidates = await hybrid_search(question, k=20)
# 2. Reranker — cross-encoder (expensive lekin aniq)
# Query + chunk'ni bitta model'ga yuboramiz, aniqroq score
from cohere import AsyncClient as CohereClient
cohere = CohereClient(...)
rerank_result = await cohere.rerank(
query=question,
documents=[c["text"] for c in candidates],
model="rerank-english-v3.0",
top_n=5
)
# 3. Top 5 — eng sifatli context
reranked = [candidates[r.index] for r in rerank_result.results]
# 4. LLM
return await generate_with_context(question, reranked)
✓
Reranker — hidden superpower
Reranker (Cohere Rerank, BGE-reranker) retrieval accuracy'ni 30-50% oshiradi. Cross-encoder — query va har candidate'ni birga yuborib, aniq score chiqaradi. Over-fetch 20 → rerank → top 5 — 2026 production standard.
10 ta RAG arxitekturasi
Type
Nima
Use case
1. Naive RAG
Vanilla embed → retrieve → generate
Prototype, boshlang'ich
2. Hybrid RAG
BM25 + vector + RRF
Production baseline
3. Rerank RAG
Over-fetch + cross-encoder rerank
High precision kerak
4. HyDE
Hypothetical Document Embedding — LLM javob generate qilib, shuni embed qilib qidiradi
Sparse queries
5. Graph RAG
Knowledge graph + vector
Entity relationships kerak
6. Multi-modal RAG
Text + image + video
Katalog, media
7. Agentic RAG
Agent iterative retrieves, decides
Multi-hop, research
8. Self-correcting (CRAG)
Retrieval quality check → re-retrieve
Complex queries
9. Adaptive RAG
Question type → different strategy
Mixed workload
10. Contextual RAG
Har chunk'ga "context prefix" (Anthropic)
35%+ accuracy improvement
Contextual Retrieval — breakthrough texnika
Anthropic'ning Contextual Retrieval texnikasi: har chunk'dan oldin LLM'dan "bu chunk qaysi context'dan?" deb qisqa izoh qo'shdiramiz. Embedding va BM25 sifatini sezilarli oshiradi.
contextual_rag.py
CONTEXTUALIZE_PROMPT = """Quyida to'liq hujjat va undan olingan qism.
Berilgan qism hujjatning qaysi qismiga tegishli, qanday context'da ekanini qisqa
50-100 so'zda tushuntiring.
Full document:
{full_doc}
Chunk:
{chunk}
Context (faqat qisqa izoh, hech narsa qo'shmang):"""
async def contextualize_chunk(full_doc: str, chunk: str) -> str:
"""Har chunk'dan oldin qo'shiladigan kontekst izoh"""
response = await client.messages.create(
model="claude-haiku-4-5-20251001", # kichik model — arzon
max_tokens=150,
messages=[{
"role": "user",
"content": CONTEXTUALIZE_PROMPT.format(full_doc=full_doc[:50000], chunk=chunk)
}]
)
return response.content[0].text
async def index_with_context(doc_id: str, content: str):
chunks = chunk_text(content, 500)
# Har chunk'ni kontekstualizatsiya qilamiz
contextualized = await asyncio.gather(*[
contextualize_chunk(content, c) for c in chunks
])
# Chunk = context + original
final_chunks = [f"{ctx}\n\n{chunk}" for ctx, chunk in zip(contextualized, chunks)]
# Embed va store
embeddings = await embed_batch(final_chunks)
await qdrant.upsert(...)
Agentic RAG — next frontier
Classic RAG — one-shot: retrieve once, generate once. Agentic RAG — LLM agent iterativ qidiradi, o'zi qaror qiladi "yetarli ma'lumot olindimi?"
agentic_rag.py
"""Agent retrieval'ni tool sifatida ishlatadi"""
TOOLS = [
{
"name": "search_knowledge",
"description": "Ichki KB'dan qidirish. Multiple tadqiqot savollari uchun bir necha marta chaqirish mumkin.",
"input_schema": {
"type": "object",
"properties": {"query": {"type": "string"}, "limit": {"type": "integer", "default": 5}},
"required": ["query"]
}
},
{
"name": "search_web",
"description": "Yangi tashqi ma'lumot kerak bo'lsa web search",
"input_schema": {
"type": "object",
"properties": {"query": {"type": "string"}},
"required": ["query"]
}
}
]
AGENTIC_SYSTEM = """Sen research agentsan. Murakkab savollarga javob berish uchun:
1. Savolni sub-savollarga bo'l
2. Har biri uchun search_knowledge yoki search_web ishlat
3. Yetarli ma'lumot yig'ilgach — sintez qil
4. MAX 5 ta search iteratsiyasi"""
async def agentic_rag_answer(question: str, max_iterations: int = 5):
messages = [{"role": "user", "content": question}]
iteration = 0
while iteration < max_iterations:
response = await client.messages.create(
model="claude-opus-4-7",
max_tokens=2000,
tools=TOOLS,
system=AGENTIC_SYSTEM,
messages=messages
)
if response.stop_reason == "end_turn":
return response.content[0].text
messages.append({"role": "assistant", "content": response.content})
tool_results = []
for block in response.content:
if block.type == "tool_use":
if block.name == "search_knowledge":
results = await hybrid_search(block.input["query"])
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": json.dumps(results[:5])
})
messages.append({"role": "user", "content": tool_results})
iteration += 1
# Budget tugadi — hozirgi ma'lumot bilan javob
messages.append({"role": "user", "content": "Budget tugadi. Hozirgi ma'lumot asosida javob ber."})
final = await client.messages.create(model="claude-opus-4-7", max_tokens=2000, messages=messages)
return final.content[0].text
✦
Agentic RAG — qimmat, lekin aniq
Classic RAG: ~2000 token/query. Agentic RAG: 10-20k token (3-10x). Lekin multi-hop savollarga javob sifati dramatic oshadi. Stop condition muhim — iteration budget, confidence threshold, cost cap.
12.3
rag evaluation — how to measure
RAG sistemangiz yaxshi ishlayaptimi qanday bilasiz?
javob
Key metrics
Metric
Nima o'lchaydi
Tool
Context Precision
Retrieved chunks relevantmi?
RAGAS
Context Recall
Hamma kerakli chunk topildi?
RAGAS
Faithfulness
Javob context'ga mos?
RAGAS
Answer Relevance
Javob savolga mos?
RAGAS
Hit Rate @k
Top-k'da to'g'ri chunk bormi?
Custom
MRR
To'g'ri chunk o'rtacha qaysi o'rinda?
Custom
ragas_eval.py
from ragas import evaluate
from ragas.metrics import context_precision, context_recall, faithfulness, answer_relevance
from datasets import Dataset
# Ground truth dataset
eval_data = Dataset.from_dict({
"question": [...], # savollar
"answer": [...], # sizning RAG javoblari
"contexts": [...], # retrieved chunks
"ground_truth": [...] # to'g'ri javoblar (annotator)
})
result = evaluate(
eval_data,
metrics=[context_precision, context_recall, faithfulness, answer_relevance]
)
print(result)
# {'context_precision': 0.82, 'context_recall': 0.78, ...}
13
embeddings & vector search — deep dive
13.1
embedding models — tanlov va tradeoffs
Qaysi embedding model'ni qachon tanlash?
javob
Embedding — text → vector
Embedding model matnni fixed-length raqamlar vektoriga aylantiradi. Semantik o'xshashlik — vektorlar o'rtasidagi masofaga proporsional. "Python code" va "Python dasturlash" — kuchli o'xshashlik. Embedding quality'si RAG natijalarini to'g'ridan to'g'ri belgilaydi.
Embedding models landscape
Model
Dim
Context
Narx / Deploy
Best for
OpenAI text-embedding-3-large
3072
8192
$0.13/1M tokens
English, general
OpenAI text-embedding-3-small
1536
8192
$0.02/1M
Cost-effective baseline
Cohere embed-v3
1024
512
$0.10/1M
Multilingual
Voyage-3
1024
32k
$0.06/1M
Long docs, technical
BGE-M3 (BAAI)
1024
8k
Self-host free
Multilingual open-source
Nomic-embed-text-v1.5
768
8k
Self-host / $0.02
Open, good English
Jina-embeddings-v3
1024
8k
Self-host
Multilingual, fine-tunable
Matryoshka embeddings — adaptive trick
Yangi embedding modellar dimension truncationni qo'llab-quvvatlaydi. 3072 dim embedding'ni 256 dim ga kesasiz — aksar 90%+ sifat saqlanadi. Disk/RAM 12x tejaladi.
matryoshka.py
# OpenAI — dimensions parameter
response = await openai.embeddings.create(
model="text-embedding-3-large",
input=["hello"],
dimensions=512 # 3072 → 512 ga truncate
)
# Yoki oddiy indexing:
# Coarse search: 256-dim (tez)
# Rerank: to'liq 3072-dim (aniq)
Multilingual — o'zbek tiliga
O'zbek tilida ishlaydigan embedding modellari: Cohere embed-v3 (muloyim), BGE-M3 (open-source), multilingual-e5-large (self-host). OpenAI hozir yaxshi ishlaydi lekin eng yaxshi emas.
13.2
rerankers — precision booster
Reranker nima va nega 30-50% accuracy oshiradi?
javob
Bi-encoder vs Cross-encoder
Embedding model (bi-encoder): query va chunk'ni alohida embed qiladi. Tez, lekin query-chunk interaction yo'q. Reranker (cross-encoder): query + chunk birga model'ga yuboradi. Sekinroq, lekin ancha aniqroq.
rerank.py
# Option 1 — Cohere Rerank (managed)
from cohere import AsyncClient
cohere = AsyncClient(api_key="...")
async def cohere_rerank(query: str, docs: list[str], top_n: int = 5):
result = await cohere.rerank(
query=query,
documents=docs,
model="rerank-multilingual-v3.0", # 100+ tillar
top_n=top_n
)
return [(r.index, r.relevance_score) for r in result.results]
# Option 2 — BGE-reranker (self-host)
from FlagEmbedding import FlagReranker
reranker = FlagReranker('BAAI/bge-reranker-v2-m3', use_fp16=True)
def local_rerank(query: str, docs: list[str], top_n: int = 5):
pairs = [[query, doc] for doc in docs]
scores = reranker.compute_score(pairs, normalize=True)
ranked = sorted(enumerate(scores), key=lambda x: x[1], reverse=True)[:top_n]
return ranked
# Option 3 — Jina reranker
from jina_ai_search_foundation_sdk import JinaRerank
✓
Over-fetch → rerank pattern
Production'da har doim: retrieve 20-30 → rerank top 5. Bu "over-fetch" strategiyasi latency'ni unchalik oshirmaydi (10-50ms qo'shadi), lekin accuracy dramatic oshadi. Har RAG system'da shu pattern kerak.
14
fine-tuning & local llms — lora, qlora, deployment
14.1
qachon fine-tune kerak vs rag vs prompting
Prompt, RAG, yoki fine-tuning — qaysi birini qachon?
javob
Decision tree — nima kerak?
─────────────────────────────────────
Yangi/aniq faktual ma'lumot kerakmi?
├─ Ha → RAG (knowledge injection)
└─ Yo'q, fixed
└─ Behavior/style/format o'zgartirmoqchimisiz?
├─ Yo'q → Prompt engineering yetarli
├─ Ha, arzon usul → Few-shot prompt, system prompt
└─ Ha, consistent/volume → Fine-tune (LoRA/QLoRA)
Hybrid pattern — eng kuchli:
Base model + Fine-tuned adapter + RAG context
Texnika
Cost
Latency
Qachon yaxshi
Prompting
$ (faqat API calls)
Normal
Boshlash, kichik use-case
Few-shot
$$ (token cost oshdi)
Normal
Oddiy format, 3-5 misol
RAG
$$ (embed + retrieve + gen)
Sekinroq
Faktual, yangilanib turadi
LoRA fine-tune
$$$ (bir martalik)
Fast inference
Style, format, domain
Full fine-tune
$$$$$
Fast
Kamdan-kam holat, enterprise
14.2
lora va qlora — parameter-efficient fine-tuning
LoRA va QLoRA qanday ishlaydi va qanday qilinadi?
javob
LoRA magicasi
7B parametrli model'ning hammasini fine-tune qilish → 120 GB VRAM kerak. LoRA (Low-Rank Adaptation) — base model parametrlarini "muzlatamiz", kichik (<0.1%) qo'shimcha "adapter" matritsalarni o'rgatamiz. 24 GB VRAM yetadi. QLoRA — base model'ni 4-bit'ga quantize + LoRA. 12 GB VRAM — RTX 4070 Ti!
500 ta high-quality misol > 50,000 ta noisy misol. Har misol model'ga "ideal javob" ko'rsatadi. 1-2 xato misol modelga noto'g'ri habit o'rgatadi. Human review shart.
dataset.jsonl
{"instruction": "Berilgan Python kodda xato topib, tuzatilgan versiyani bering.",
"input": "def divide(a, b): return a / b",
"output": "```python\ndef divide(a: float, b: float) -> float:\n if b == 0:\n raise ValueError('Cannot divide by zero')\n return a / b\n```\n\nTuzatish: `ZeroDivisionError` himoyasi va type hints qo'shildi."}
{"instruction": "SQL query'ni optimallashtiring.",
"input": "SELECT * FROM orders WHERE UPPER(email) = 'TEST@EXAMPLE.COM'",
"output": "..."}
Local deployment — Ollama
ollama_deploy.sh
# Ollama install
curl -fsSL https://ollama.com/install.sh | sh
# Modelfile yaratish (fine-tuned modelni Ollama'ga qo'shish)
cat > Modelfile << 'EOF'
FROM ./gguf_model/unsloth.Q4_K_M.gguf
TEMPLATE """### Instruction:
{{ .Prompt }}
### Response:
"""
PARAMETER temperature 0.7
PARAMETER top_p 0.9
PARAMETER num_ctx 2048
EOF
# Model create
ollama create my-assistant -f Modelfile
# Ishga tushirish
ollama run my-assistant
# API sifatida (OpenAI-compatible)
# http://localhost:11434/v1/chat/completions
$300 GPU + 500 ta misol + 2-3 soat training = production-ready domain-specific model. Hamma biznes o'ziga shunday model yarata oladi. Brand voice, compliance, offline inference. Hybrid: base model (general) + LoRA adapter (domain) + RAG (facts) — 2026 arxitektura.
15
ai agents, multi-agent & mcp
15.1
ai agents — autonomous task executors
AI agent nima va passiv chatbot'dan qanday farq qiladi?
javob
Chatbot vs Agent — asosiy farq
Chatbot: savol → javob. Passiv, bir martalik. Foydalanuvchi har qadamda yo'l-yo'riq berishi kerak. Agent: maqsad beriladi → agent o'zi reja tuzadi → kerakli vositalarni (tool) ishlatadi → natijani tekshiradi → agar xato bo'lsa, o'zini tuzatadi → maqsadga yetguncha takrorlaydi. Autonomous, iterativ. "GitHub issue'larni tahlil qilib, eng muhimini Discord'ga yozib qo'y" — bu agent ishi, chatbot qila olmaydi.
ReAct pattern — agent "fikrlash" tarzini tushunish
Agent qanday qaror qiladi? Eng keng tarqalgan yondashuv — ReAct (Reason + Act). Har qadamda LLM uch narsa qiladi: (1) Thought — "Endi nima qilishim kerak, qanday mantig'?" deb o'ylaydi (inson kabi fikrlash); (2) Action — biror tool chaqiradi (DB query, API call, file read); (3) Observation — tool natijasini ko'radi. Keyin yangi Thought boshlanadi. Bu cycle maqsadga yetilguncha yoki max_iterations tugaguncha davom etadi.
Misol: "Beeline'ning oxirgi oylik revenue qancha?" → Thought: "DB'ga so'rov yuborish kerak" → Action:query_db("SELECT SUM(revenue)...") → Observation: "1,240,000,000 UZS" → Thought: "Endi bu raqamni formatlash kerak" → Action: "format_currency(...)" → Final Answer.
Agent anatomy — 4 komponent
ReAct pattern — reasoning + acting
react_agent.py
"""ReAct: Thought → Action → Observation loop"""
REACT_PROMPT = """Sen agentsan. Berilgan vazifani bajarish uchun tool'lardan foydalanasan.
Har qadamda:
Thought: nima qilmoqchiligingni o'yla
Action: tool chaqirish
Observation: tool natijasi (avtomatik to'ldiriladi)
Yakuniy javob bo'lganda:
Final Answer: to'liq javob
Tools:
- search_docs(query): ichki hujjatlarda qidirish
- fetch_url(url): sahifa matnini olish
- calculate(expression): hisoblash
- query_db(sql): ma'lumotlar bazasidan
Vazifa: {task}"""
async def react_agent(task: str, max_steps: int = 10):
messages = [{"role": "user", "content": REACT_PROMPT.format(task=task)}]
for step in range(max_steps):
response = await client.messages.create(
model="claude-opus-4-7",
max_tokens=1024,
tools=TOOLS,
messages=messages,
)
messages.append({"role": "assistant", "content": response.content})
# Terminal condition
if response.stop_reason == "end_turn":
for block in response.content:
if block.type == "text" and "Final Answer:" in block.text:
return block.text.split("Final Answer:")[-1].strip()
# Tool execution
if response.stop_reason == "tool_use":
tool_results = []
for block in response.content:
if block.type == "tool_use":
try:
result = await execute_tool(block.name, block.input)
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": json.dumps(result)[:5000] # truncate
})
except Exception as e:
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": f"Error: {str(e)}",
"is_error": True
})
messages.append({"role": "user", "content": tool_results})
return "Max steps reached without final answer"
15.2
multi-agent systems — orchestration patterns
Bir agent yetmaydi — ko'p agent qanday birga ishlaydi?
javob
Ixtisoslashgan agent'lar
Bitta "hamma narsa qiladigan" agent sifat va aniqlik'da past. Yaxshiroq: researcher → analyst → writer — har biri bitta ishga ixtisoslashgan. Software team kabi: PM, backend dev, frontend dev, QA.
Supervisor pattern — hierarchical
multi_agent.py
"""Supervisor agent sub-agent'larga vazifalar tarqatadi"""
from langgraph.graph import StateGraph, END
from typing import TypedDict, Literal
class AgentState(TypedDict):
task: str
research: str
analysis: str
final_report: str
next_step: str
# Specialist agents
async def researcher_agent(state: AgentState):
"""Ma'lumot yig'ish"""
result = await run_agent(
system="Sen researcher. Faqat faktlarni yig'. Source cite qil.",
tools=[search_web, search_docs],
task=state["task"]
)
return {"research": result}
async def analyst_agent(state: AgentState):
"""Tahlil va insight"""
result = await run_agent(
system="Sen data analyst. Raqamlarni tushuntir, trend'larni top.",
tools=[calculate, query_db],
task=f"Research:\n{state['research']}\n\nTahlil qil."
)
return {"analysis": result}
async def writer_agent(state: AgentState):
"""Final report yozish"""
result = await run_agent(
system="Sen technical writer. Aniq, qisqa, strukturali yoz.",
task=f"Research:\n{state['research']}\n\nAnalysis:\n{state['analysis']}\n\nReport yoz."
)
return {"final_report": result}
# Supervisor
async def supervisor(state: AgentState) -> dict:
"""Supervisor keyingi kimni chaqirishni aniqlaydi"""
response = await client.messages.create(
model="claude-opus-4-7",
system="Sen supervisor. Berilgan state'ga qarab keyingi qadamni tanla: "
"researcher, analyst, writer, yoki done.",
messages=[{"role": "user", "content": json.dumps(state)}]
)
next_step = response.content[0].text.strip().lower()
return {"next_step": next_step}
def route(state: AgentState) -> Literal["researcher", "analyst", "writer", END]:
return state["next_step"] if state["next_step"] != "done" else END
# Graph tuzish
graph = StateGraph(AgentState)
graph.add_node("supervisor", supervisor)
graph.add_node("researcher", researcher_agent)
graph.add_node("analyst", analyst_agent)
graph.add_node("writer", writer_agent)
graph.set_entry_point("supervisor")
graph.add_conditional_edges("supervisor", route)
graph.add_edge("researcher", "supervisor")
graph.add_edge("analyst", "supervisor")
graph.add_edge("writer", "supervisor")
app = graph.compile()
# Run
result = await app.ainvoke({"task": "2026 RAG arxitekturalarini tahlil qiling"})
Swarm pattern — peer-to-peer
Agent'lar bir-biriga "handoff" qiladi, supervisor yo'q. Sodda, dinamik. OpenAI Swarm kutubxonasi yoki CrewAI.
crewai.py
from crewai import Agent, Task, Crew, Process
researcher = Agent(
role="Research Analyst",
goal="2026 backend trendlarini chuqur tadqiq qilish",
backstory="10 yil tajribali tech analyst",
tools=[search_tool, browser_tool],
verbose=True
)
writer = Agent(
role="Technical Writer",
goal="Research natijalari asosida clear article yozish",
backstory="Tech blog writer",
verbose=True
)
research_task = Task(
description="2026 backend architecture trends haqida comprehensive research",
agent=researcher,
expected_output="Detailed findings with sources"
)
write_task = Task(
description="Research asosida 2000 so'zli blog post yozish",
agent=writer,
expected_output="Publication-ready blog post",
context=[research_task] # dependency
)
crew = Crew(
agents=[researcher, writer],
tasks=[research_task, write_task],
process=Process.sequential
)
result = crew.kickoff()
Multi-agent patterns comparison
Pattern
Coordination
Best for
Sequential
Fixed pipeline
Well-defined workflows
Supervisor
Central dispatcher
Complex, branching tasks
Swarm/Handoff
Peer-to-peer
Dynamic, unpredictable
Hierarchical
Teams + managers
Large-scale, specialty
Debate/Consensus
Multiple agree
High-stakes decisions
15.3
mcp — model context protocol
MCP nima va nima uchun muhim?
javob
MCP — "USB-C for AI" — nima muammoni hal qiladi?
Muammo: har AI app o'z-o'zicha tool integration yozadi. GitHub Copilot o'z GitHub integration'ini yozgan, Cursor o'zini, Claude Desktop o'zini — har biri turlicha. Agar siz PostgreSQL MCP server yozsangiz, faqat bitta AI bilan ishlardi.
MCP yechimi: Anthropic 2024-yil oxirida open standard chiqardi. MCP server bitta protokolga muvofiq yoziladi → Claude, ChatGPT, Cursor, va istalgan MCP-compatible AI bilan ishlaydi. "Bir marta yoz, hamma joyda" — USB-C analogi. 2025-2026'da 2000+ MCP server ecosystem paydo bo'ldi (GitHub, Slack, Notion, PostgreSQL, Jira...).
MCP uchta primitiv — Tools, Resources, Prompts
Tools — LLM chaqira oladigan funksiyalar (DB query, API call, fayl o'qish). AI function calling bilan o'xshash, lekin standarti bor. Resources — LLM o'qiy oladigan ma'lumotlar (fayl tarkibi, DB row, URL content) — tool chaqirmay, context'ga qo'shiladi. Prompts — oldindan tayyorlangan prompt template'lar (masalan, "code review" — standart ko'rsatmalar bilan). Shu uchlik bir MCP server'da hammasi bo'lishi mumkin.
MCP architecture
MCP server yozish — minimal misol
mcp_server.py
"""MCP server — har AI client foydalanishi mumkin"""
from mcp.server import Server, NotificationOptions
from mcp.server.models import InitializationOptions
import mcp.types as types
server = Server("weather-mcp")
# 1. Tool — LLM chaqirishi mumkin
@server.list_tools()
async def list_tools() -> list[types.Tool]:
return [
types.Tool(
name="get_weather",
description="Shahar ob-havosi",
inputSchema={
"type": "object",
"properties": {
"city": {"type": "string"},
"units": {"type": "string", "enum": ["c", "f"]}
},
"required": ["city"]
}
)
]
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[types.TextContent]:
if name == "get_weather":
result = await fetch_weather(arguments["city"], arguments.get("units", "c"))
return [types.TextContent(type="text", text=json.dumps(result))]
# 2. Resource — static ma'lumot (masalan, fayl)
@server.list_resources()
async def list_resources() -> list[types.Resource]:
return [
types.Resource(
uri="weather://stations",
name="Weather stations catalog",
mimeType="application/json"
)
]
@server.read_resource()
async def read_resource(uri: str) -> str:
if uri == "weather://stations":
return json.dumps(await get_stations())
# 3. Prompt — reusable template
@server.list_prompts()
async def list_prompts() -> list[types.Prompt]:
return [
types.Prompt(
name="weather_report",
description="Ob-havo hisoboti yaratish",
arguments=[
types.PromptArgument(name="city", description="Shahar", required=True)
]
)
]
# Run stdio transport
if __name__ == "__main__":
import asyncio
from mcp.server.stdio import stdio_server
async def main():
async with stdio_server() as (read, write):
await server.run(read, write, InitializationOptions(
server_name="weather-mcp",
server_version="1.0.0",
capabilities=server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={}
)
))
asyncio.run(main())
OpenAI, Google, Microsoft hammasi qabul qilishdi. MCP registry'da 500+ server mavjud. Your internal tools ham MCP server qilib yozsangiz — har AI tool'da ishlaydi. Enterprise integration standard bo'lmoqda.
August 2026'dan boshlab, high-risk AI system'lar EU'da majburiy documented, audited bo'lishi kerak. Technical documentation, risk assessment, human oversight, conformity certificate. Uzbek kompaniyalar ham EU mijozlariga xizmat qilayotganda tushishadi.
Part V
architecture decisions & trade-offs
Texnikalarni bilish — yarmi. Qachon qaysini tanlash — ikkinchi yarmi. Bu qismda tradeoff'lar, decision framework'lar va 2026 career roadmap.
17
architecture decisions & trade-offs
17.1
tradeoffs matrix — umumiy qarorlar
Texnologiya tanlashda qanday tradeoff'lar bor?
javob
Architect'ning asosiy ishi
Code yozish — dasturchi ishi. Arxitektor — "trade-off'lar menejeri". Har qaror cost, latency, complexity, reliability, flexibility o'rtasida. "Eng yaxshi" yo'q — "sizning konteksingiz uchun optimal" bor.
Monolith vs Microservice
Aspect
Monolith
Microservices
Development speed (start)
Tez
Sekin (infra setup)
Development speed (scale)
Sekinlashadi
Tez (parallel teams)
Deployment
Bir button, bir risk
Independent, lekin orchestrate
Debugging
Oson (bitta log)
Qiyin (distributed tracing kerak)
Data consistency
ACID transactions
Saga, eventual consistency
Network cost
In-process (0)
gRPC/HTTP overhead
Team size
<20 dev
50+ dev
When to use
Startup, MVP, small team
Scale, independent teams
✓
Modular Monolith — to'g'ri yondashuv
Boshlang'ichda: modular monolith. Code'ni mantiqiy module'larga bo'ling (bounded contexts), lekin bitta deploy. Kerak bo'lganda module'ni alohida service qilib chiqarish oson. "Microservice first" — antipattern kichik jamoalar uchun.
SQL vs NoSQL
PostgreSQL (SQL) — default
ACID, relational integrity
Complex queries, JOINs
JSONB — kerak bo'lsa NoSQL
Full-text search built-in
Vector support (pgvector)
Mature, 40+ yosh ecosystem
NoSQL — specific cases
MongoDB: schema-flexible doc store
DynamoDB: massive scale key-value
Cassandra: write-heavy time-series
Redis: cache, pubsub, real-time
Neo4j: graph relationships
Rule of thumb 2026: PostgreSQL'dan boshlang. 90% SaaS muammolari unda hal bo'ladi. Limit'ga urilgandagina specialize qiling.
Sync vs Async
Use case
Sync
Async
REST API request
✓ default
Faqat I/O bound bo'lsa
Database query
Oddiy, ishonchli
Ko'p concurrent request
File upload/download
Sekin, block
✓ ishlatilishi shart
External API calls
Bir nechta seconds
✓ paralel, tez
Email, SMS
User kutsin? Yo'q
✓ queue'ga
Reports, exports
Timeout risk
✓ background job
Real-time push
Impossible
✓ WebSocket/SSE
Build vs Buy
Build qilish jonkuyar:
- Core business logic (sizning raqobat afzalligingiz)
- Unique requirements hech kim qo'llamayapti
- Licensing restrictions
Buy (yoki open-source) qilish jonkuyar:
- Auth (Auth0, Clerk, Supabase Auth)
- Analytics (PostHog, Amplitude)
- Payments (Stripe, Paddle)
- Email (Resend, Postmark)
- Monitoring (Datadog, Grafana Cloud)
- LLM hosting (managed API)
- Vector DB (agar scale kerak bo'lsa)
Rule: "Build differentiator, buy commodity"
17.2
backend engineer — learning roadmap
Qanday tartibda o'rganish kerak? Junior'dan senior'gacha yo'l.
javob
Level 1 — Foundation (0-12 oy)
ⓘ
Junior backend engineer
Tillar: 1 ta to'liq bilish (Python yoki TypeScript) Framework: 1 ta production-grade (FastAPI, Express, Django) Database: PostgreSQL fundamentals, SQL yaxshi HTTP: REST API, JSON, status codes, cookies Tools: Git, Docker basics, Linux commands Testing: unit test yozish
Level 2 — Professional (1-3 yil)
ⓘ
Mid-level
DB deep: indexing, query tuning, transactions, locks Caching: Redis patterns, cache invalidation Async: Celery/queues, event-driven basics Auth: OAuth, JWT, RBAC Observability: logs, metrics, basic tracing K8s basics: pod, deployment, service CI/CD: pipeline yozish AI basics: LLM API ishlatish, prompt engineering
Level 3 — Senior (3-6 yil)
ⓘ
Senior engineer
System design: mikroservis, event sourcing, CQRS DB advanced: sharding, replication, multi-region Distributed: consistency, CAP, Saga pattern Performance: profiling, load testing Security: threat modeling, zero trust Cloud: AWS/GCP services deep AI: RAG production, fine-tuning, agents Leadership: mentoring, tech decisions, ADRs
Level 4 — Staff/Principal (6+ yil)
ⓘ
Staff+ engineer
Technical strategy: multi-year architecture vision Cross-team impact: butun kompaniya darajasida Business sense: tech qarorlarning biznes ta'siri Specialized depth: biror bir sohada top expert (distributed systems, AI infrastructure, security, etc.) Mentoring at scale: senior'larni yetishtirish
AI-specific track — zamonaviy yo'l
Backend engineer bo'lib tursangiz ham, AI exposure shart. Bu bo'limlarni alohida o'rganing:
LLM fundamentals — qanday ishlaydi, tokens, context, cost
AI backend'ni almashtirmaydi, lekin backend engineer'ni o'zgartiradi. Faqat CRUD API yozadigan dev — bozordan tushib ketadi. AI-native architect — biznes muammosini AI bilan hal qila biladigan muhandis — bo'sh vakansiyalar ko'payib boradi. Prompt yozish, RAG quvur qurish, agent orkestrovka — bular yangi "normal" ko'nikmalar.
17.3
umumiy tamoyillar — stayish qoidalari
yakun
Architect'ning yadrosi
Texnikalar o'zgaradi — tamoyillar qoladi. Bu yerda 2026-da hali ham haqiqiy qolgan principles.
Simple > clever. Oddiy kod — oson debug, oson qayta yozish. Aqlli kod 6 oydan keyin sizning o'zingizga tushunarsiz.
Boring technology wins. PostgreSQL 30 yoshda. Redis 15 yoshda. Ishlatilgan, ishonchli. Har yili yangi trend texnologiya — ehtiyot bo'ling.
Premature optimization is the root of all evil. Measure first, optimize after. Lekin loyihani bosh'dan noto'g'ri qurmang — arxitektura muhim.
Fail fast, fail loudly. Silent failure — eng xavfli. Xato — tez sezilib, alert bo'lsin.
Idempotency is a superpower. Har operatsiya ikki marta bajarilsa ham xavfsiz bo'lsin. Retries, replays, chaos — hammasi oson bo'ladi.
Observability before optimization. Qayerda sekin bo'layotganini bilmasdan optimize qilish — ko'z yumib otish.
Security is not a feature. Har layer'da o'ylanishi kerak. Defense in depth.
Cost is architecture. Cloud cost = design choice. $10k/oy'lik system $1k/oy'gacha tushirilishi mumkin (tahlil + re-architect).
Write for humans. Kod boshqa odamlar uchun yoziladi. Keyinchalikchi siz — "boshqa odam".
Tests are documentation. Yaxshi test — kod qanday ishlatilishini ko'rsatadi.
Deletion is better than addition. Kamroq kod — kamroq bug, kamroq maintenance.
YAGNI — You Aren't Gonna Need It. Kelajakdagi "ehtimol kerak bo'ladi" fichersi uchun kod yozmang.
"Any fool can write code that a computer can understand.
Good programmers write code that humans can understand." — Martin Fowler
Part VI
system design patterns
Real interview va production'da uchrashtiladigan klassik system design muammolari. Har biri: requirements → scale estimation → HLD diagram → deep dive → trade-offs. URL shortener, chat, news feed, video streaming — bularni tushungan muhandis istalgan tizimni loyihalashi mumkin.
18
system design — interview & production patterns
18.1
url shortener — design bit.ly / tinyurl
Millionlab foydalanuvchi uchun URL qisqartirish xizmatini qanday loyihalash kerak?
javob
Nima uchun bu muammo muhim?
URL shortener — apparent sodda, lekin ichida murakkab muhandislik bor. 100:1 read/write nisbati — 1 URL qo'shiladi, 100 marta bosiladi. Sub-millisecond redirect kerak (user kutmaydi). 100 milliard+ URL saqlanishi mumkin. Bu muammo scalability, caching, hashing va consistency haqida hamma narsani sinovdan o'tkazadi.
Requirements va Scale Estimation
Functional requirements:
- Uzun URL → qisqa URL (7 belgi, base62)
- Qisqa URL → redirect (301 Permanent yoki 302 Found)
- URL expiry (ixtiyoriy, masalan 30 kun)
- Analytics (nechta marta bosildi)
Non-functional:
- 100M yangi URL/kun yaratish
- 10B redirect/kun (100:1 read:write)
- 99.99% uptime (SLA)
- Redirect latency < 10ms (p99)
Back-of-envelope:
Writes: 100M/day = ~1,160/sec
Reads: 10B/day = ~116,000/sec (peak 300k/sec)
Storage: 7 bytes (key) + 200 bytes (URL) + metadata = ~500 bytes/URL
5 yil saqlasak: 100M * 365 * 5 * 500B = ~90TB
Cache (20% hot URLs): ~18TB
High-Level Architecture
Key Decision: Base62 encoding
Nima uchun Base62 va 7 ta belgi?
62 ta belgi (a-z, A-Z, 0-9) dan 7 ta pozitsiya bilan 62⁷ = 3.5 trillion kombinatsiya — 100 yilga yetarli (100M URL/kun hisobida). MD5/SHA hash ishlatsak — collision imkoniyati bor. Snowflake ID (Twitter-dan) — distributed, time-sorted, collision-free 64-bit integer → Base62 encode qilamiz. Bu yondashuv ham global unique, ham sortable (analytics uchun qulay).
url_shortener.py
import string
import time
import random
BASE62 = string.ascii_letters + string.digits # 62 ta belgi
# Snowflake-inspired ID generator
class SnowflakeIDGenerator:
EPOCH = 1700000000000 # ms, custom epoch
DATACENTER_BITS = 5
MACHINE_BITS = 5
SEQUENCE_BITS = 12
def __init__(self, datacenter_id: int, machine_id: int):
self.datacenter_id = datacenter_id
self.machine_id = machine_id
self.sequence = 0
self.last_timestamp = -1
def next_id(self) -> int:
ts = int(time.time() * 1000) - self.EPOCH
if ts == self.last_timestamp:
self.sequence = (self.sequence + 1) & 4095 # 12 bit
if self.sequence == 0:
while ts <= self.last_timestamp:
ts = int(time.time() * 1000) - self.EPOCH
else:
self.sequence = 0
self.last_timestamp = ts
# 41 bit ts | 5 bit datacenter | 5 bit machine | 12 bit seq
return (ts << 22) | (self.datacenter_id << 17) | (self.machine_id << 12) | self.sequence
def encode_base62(num: int) -> str:
"""64-bit int → 7 ta Base62 belgi"""
chars = []
while num > 0:
chars.append(BASE62[num % 62])
num //= 62
return ''.join(reversed(chars)).zfill(7)
def decode_base62(code: str) -> int:
num = 0
for c in code:
num = num * 62 + BASE62.index(c)
return num
# FastAPI endpoint
from fastapi import FastAPI, HTTPException
from fastapi.responses import RedirectResponse
import redis.asyncio as aioredis
app = FastAPI()
redis_client = aioredis.from_url("redis://localhost")
id_gen = SnowflakeIDGenerator(datacenter_id=1, machine_id=1)
@app.post("/shorten")
async def shorten(long_url: str, user_id: str = "anonymous", ttl_days: int = 30):
# Rate limit check (token bucket from 1.3 bo'limi)
allowed, remaining = await check_rate_limit(user_id)
if not allowed:
raise HTTPException(429, "Rate limit exceeded")
short_id = encode_base62(id_gen.next_id())
# Redis'da saqlash (primary fast path)
await redis_client.setex(
f"url:{short_id}",
ttl_days * 86400,
long_url
)
# DB'ga async yozish (Cassandra — eventual consistency OK)
await db.insert_url(short_id=short_id, long_url=long_url,
user_id=user_id, created_at=time.time(),
expires_at=time.time() + ttl_days * 86400)
return {"short_url": f"https://bit.ly/{short_id}", "short_id": short_id}
@app.get("/{short_id}")
async def redirect(short_id: str):
# 1. Cache'dan qidirish (hit rate ~80%)
long_url = await redis_client.get(f"url:{short_id}")
if not long_url:
# 2. DB'dan qidirish
record = await db.get_url(short_id)
if not record or record.expires_at < time.time():
raise HTTPException(404, "URL not found or expired")
long_url = record.long_url
# Cache'ga qo'shish (cache-aside pattern)
await redis_client.setex(f"url:{short_id}", 86400, long_url)
# Analytics event async (Kafka)
await kafka.send("url.clicked", {
"short_id": short_id, "timestamp": time.time(),
"user_agent": request.headers.get("user-agent"),
"ip": request.client.host
})
# 301 Permanent (browser caches) yoki 302 Found (har safar serverga)
# Analytics kerak bo'lsa 302, SEO uchun 301
return RedirectResponse(url=long_url, status_code=302)
Trade-offs: 301 vs 302 Redirect
Aspekt
301 Permanent
302 Found
Browser caches?
Ha — serverga keyingi so'rov yo'q
Yo'q — har safar serverga
Server load
Juda past (browser cache)
Har click serverga keladi
Analytics
Imkonsiz (browser cache'lagan)
Har click log'lanadi
URL o'zgarishi mumkinmi?
Yo'q (cached forever)
Ha (server qaytargan URL)
Tavsiya
Analytics yo'q, max performance
Click tracking kerak bo'lsa
✓
Production pattern — Cassandra yoki DynamoDB
URL shortener uchun NoSQL to'g'ri tanlov: bitta pattern — short_id → long_url. JOIN kerak emas. Cassandra'ning short_id partition key'i bilan millisecond lookup, horizontally scalable, geo-distributed. DynamoDB'da single-table design bilan ham xuddi shunday.
18.2
chat system — design whatsapp / telegram
Real-time chat tizimini qanday loyihalash kerak — 1 milliard foydalanuvchi uchun?
javob
Chat nima uchun qiyin?
Oddiy HTTP request-response model ishlamaydi — server xabar kelganda clientga push qilishi kerak (server-initiated). Bu "bitta serverga barcha connection" muammosini keltirib chiqaradi. 1B user, har biri online = 1B WebSocket connection — bu bitta serverda imkonsiz, distributed bo'lishi kerak. Bundan tashqari: offline delivery (user o'chiq bo'lsa xabar saqlanadi), group chat (fanout — bitta xabar N ta odamga), end-to-end encryption, "typings", "read receipts" — bularning barchasi aslida murakkab distributed systems muammolari.
Protocol tanlash: WebSocket vs Long Polling vs SSE
Protocol
Direction
Latency
Chat uchun
WebSocket
Bi-directional
<10ms
✅ Eng yaxshi — real-time, persistent
Long Polling
Unidirectional (pull)
100-500ms
⚠️ Fallback faqat
SSE
Server→Client only
<50ms
⚠️ Receive only, yuborish uchun HTTP kerak
HTTP/2 Push
Server→Client
<50ms
⚠️ Browser-only, limited support
Message delivery qanday kafolatlanadi?
At-least-once delivery va duplicate handling
Network ishonchsiz — xabar yuborildi, lekin ACK kelmaydi. Client retry qiladi — xabar ikki marta yetib kelishi mumkin. Yechim: har xabarga client_message_id (UUID yoki timestamp+random) qo'shiladi. Server yoki recipient bu ID'ni ko'rgan bo'lsa, duplicate deb e'tiborsiz qoldiradi (idempotent delivery). WhatsApp dagi "bitta tick" = yuborildi serverga, "ikkita tick" = deliveredto recipient, "ko'k ikkita tick" = o'qildi.
chat_server.py
import asyncio
import json
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from datetime import datetime
import uuid
app = FastAPI()
# Connection registry — server ichida
# Production'da Redis Pub/Sub yoki Kafka bilan
class ConnectionManager:
def __init__(self):
self.active: dict[str, WebSocket] = {} # user_id → ws
async def connect(self, user_id: str, ws: WebSocket):
await ws.accept()
self.active[user_id] = ws
# User online deb belgilash
await redis.set(f"presence:{user_id}", "online", ex=30)
def disconnect(self, user_id: str):
self.active.pop(user_id, None)
async def send_to(self, user_id: str, message: dict) -> bool:
ws = self.active.get(user_id)
if ws:
try:
await ws.send_json(message)
return True
except Exception:
self.disconnect(user_id)
return False
manager = ConnectionManager()
@app.websocket("/ws/{user_id}")
async def websocket_endpoint(ws: WebSocket, user_id: str):
await manager.connect(user_id, ws)
try:
# Offline vaqtdagi xabarlarni yetkazish
pending = await get_pending_messages(user_id)
for msg in pending:
await ws.send_json(msg)
await mark_delivered(msg["id"])
while True:
data = await ws.receive_json()
await handle_message(user_id, data)
except WebSocketDisconnect:
manager.disconnect(user_id)
await redis.delete(f"presence:{user_id}")
async def handle_message(sender_id: str, data: dict):
msg = {
"id": str(uuid.uuid4()),
"from": sender_id,
"to": data["to"],
"text": data["text"],
"timestamp": datetime.utcnow().isoformat(),
"status": "sent",
"client_msg_id": data.get("client_msg_id"), # dedup uchun
}
# 1. Cassandra'ga saqlash (message_history)
await save_message(msg)
# 2. Recipient onlinemi?
is_online = await redis.get(f"presence:{msg['to']}")
if is_online:
# 3a. Direct delivery (agar bu serverda)
delivered = await manager.send_to(msg["to"], msg)
if not delivered:
# 3b. Boshqa chat serverda — Kafka orqali
await kafka.send("messages", msg)
else:
# 3c. Push notification
await push_service.notify(
user_id=msg["to"],
title=f"Yangi xabar: {sender_id}",
body=msg["text"][:50]
)
18.3
news feed — design twitter/instagram feed
Social media feed'ni qanday loyihalash kerak? Fanout-on-write vs fanout-on-read?
javob
Feed nima uchun murakkab?
Elon Musk 170M follower'ga tweet qiladi. 170M odamning feed'ini yangilash kerak. Bitta tweet → 170M yozuv. Bu fanout muammosi — bir yozuvdan milliardlab natija. Instagram, Twitter yillarca shu muammoni hal qilishga harakat qildi. Yechim: hybrid approach — oddiy foydalanuvchilar uchun "fanout-on-write" (post qilganda hammaning feed'ini yangilaymiz), celebrity'lar uchun "fanout-on-read" (feed ochilganda celebrity postlarini pull qilamiz).
feed_service.py
CELEBRITY_THRESHOLD = 10_000 # follower soni
async def create_post(user_id: str, content: str) -> dict:
post = await db.insert_post(user_id=user_id, content=content,
created_at=time.time())
follower_count = await get_follower_count(user_id)
if follower_count < CELEBRITY_THRESHOLD:
# Fanout-on-write: barcha follower feed'ini yangilaymiz
# Async Celery task orqali (HTTP request'ni bloklamaymiz)
fanout_to_followers.delay(user_id=user_id, post_id=post.id)
else:
# Celebrity: faqat post saqlanadi, fan pull qiladi
# Hech narsa qilmaymiz — read vaqtida merge qilamiz
pass
return post
async def get_feed(user_id: str, page: int = 1) -> list:
# 1. User'ning precomputed feed'ini Redis'dan olamiz
feed_key = f"feed:{user_id}"
feed = await redis.lrange(feed_key, (page-1)*20, page*20-1)
# 2. User follow qilgan celebrity'lar postlarini merge qilamiz
celebrities = await get_followed_celebrities(user_id)
celebrity_posts = []
for celeb_id in celebrities:
posts = await db.get_recent_posts(user_id=celeb_id, limit=10)
celebrity_posts.extend(posts)
# 3. Merge, deduplicate, sort by timestamp
all_posts = list(feed) + celebrity_posts
all_posts.sort(key=lambda p: p["created_at"], reverse=True)
return all_posts[:20]
@celery_app.task
async def fanout_to_followers(user_id: str, post_id: str):
"""Background task: 1000 ta follower uchun ham tez ishlashi kerak"""
followers = await get_followers_paginated(user_id, batch_size=100)
pipeline = redis.pipeline()
for follower_id in followers:
# Redis list'ga prepend (lpush), max 200 post saqlash (ltrim)
pipeline.lpush(f"feed:{follower_id}", post_id)
pipeline.ltrim(f"feed:{follower_id}", 0, 199)
await pipeline.execute()
18.4
consistent hashing — distributed data routing
Consistent hashing nima va Redis cluster, Cassandra, CDN'larda qanday ishlatiladi?
javob
Oddiy modulo hashing muammosi
4 ta server bor. Key uchun server: server = hash(key) % 4. Hammasi yaxshi — to'rtinchi server o'chdi. Endi: hash(key) % 3. Deyarli barcha key'lar boshqa serverlarga ko'chib ketadi — 75% cache miss, DB'ga hamma bir vaqtda hujum. Consistent hashing buni hal qiladi: server qo'shilganda yoki o'chirilganda faqat keylarning bir qismi ko'chadi.
consistent_hash.py
import hashlib
from sortedcontainers import SortedDict
class ConsistentHashRing:
def __init__(self, virtual_nodes: int = 150):
self.virtual_nodes = virtual_nodes
self.ring: SortedDict = SortedDict()
self.servers: set = set()
def _hash(self, key: str) -> int:
return int(hashlib.md5(key.encode()).hexdigest(), 16)
def add_server(self, server: str):
self.servers.add(server)
for i in range(self.virtual_nodes):
vnode_key = self._hash(f"{server}:vnode:{i}")
self.ring[vnode_key] = server
def remove_server(self, server: str):
self.servers.discard(server)
for i in range(self.virtual_nodes):
vnode_key = self._hash(f"{server}:vnode:{i}")
self.ring.pop(vnode_key, None)
def get_server(self, key: str) -> str | None:
if not self.ring:
return None
key_hash = self._hash(key)
# Soat yo'nalishi bo'yicha keyingi server
idx = self.ring.bisect_left(key_hash)
if idx == len(self.ring):
idx = 0 # ring — aylana, oxiridan boshiga
return self.ring.peekitem(idx)[1]
def get_replica_servers(self, key: str, n: int = 3) -> list[str]:
"""N ta unique server (replication uchun)"""
if not self.ring or len(self.servers) < n:
return list(self.servers)
key_hash = self._hash(key)
idx = self.ring.bisect_left(key_hash)
seen = set()
result = []
for _ in range(len(self.ring)):
if idx >= len(self.ring):
idx = 0
server = self.ring.peekitem(idx)[1]
if server not in seen:
seen.add(server)
result.append(server)
if len(result) == n:
break
idx += 1
return result
# Ishlatish:
ring = ConsistentHashRing(virtual_nodes=150)
ring.add_server("cache-1:6379")
ring.add_server("cache-2:6379")
ring.add_server("cache-3:6379")
key = "user:12345:profile"
server = ring.get_server(key) # "cache-2:6379"
replicas = ring.get_replica_servers(key, n=2) # ha, replication uchun
ⓘ
Real production'da ishlatiladi
Apache Cassandra — consistent hashing + virtual nodes asosida data distribution. Redis Cluster — 16384 hash slot (bir xil g'oya, lekin slot-based). Amazon DynamoDB — ichida consistent hashing. Nginx/HAProxy upstream hashing — session affinity uchun.
19
ai architecture — advanced patterns
19.1
graphrag — knowledge graph + rag
Oddiy RAG nima uchun yetmaydi va GraphRAG qanday muammoni hal qiladi?
javob
Oddiy RAG'ning eng katta muammosi — multi-hop reasoning
Savol: "Yangi soliq qonuni bizning Yevropa ta'minot zanjirimizga qanday ta'sir qiladi?" — bu savolga javob 50 ta turli hujjatda tarqalgan: soliq qonuni hujjati, Yevropa shartnomalar, ta'minot zanjiri ma'lumotlari, logistika narxlari, risklar... Oddiy RAG eng o'xshash 5 ta chunk'ni topadi — ammo bu savolga javob berish uchun ularni bog'lab, sabab-natija zanjirini tushunish kerak. Vektor o'xshashligi bundan ojiz.
GraphRAG Microsoft tomonidan 2024-yilda open-source qilingan. G'oya: matnlardan entity va munosabatlar grafini qurish (Apple → CEO → Tim Cook, Tim Cook → born_in → Alabama). Savol kelganda vector search + graph traversal qo'shiladi — "multi-hop" reasoning mumkin bo'ladi.
Oddiy RAG vs GraphRAG — qachon qaysi?
Oddiy RAG yetarli: savol to'g'ridan-to'g'ri javobli ("ushbu shartnomada narx qancha?"), chunklarda mustaqil javob bor. GraphRAG kerak: munosabatlar muhim ("kompaniya kim bilan hamkor, bu hamkor kimga sotadi, u kim bilan bog'liq?"), "taqqoslash" savollari ("A va B kompaniya strategiyalari qanday farq qiladi?"), hierarchical summarization (butun corpus haqida savol).
graphrag_pipeline.py
"""
GraphRAG pipeline: Document → Entity extraction → Graph → Hybrid retrieval
"""
from anthropic import AsyncAnthropic
from neo4j import AsyncGraphDatabase
import asyncio
client = AsyncAnthropic()
# ── 1. INDEXING: Entity va munosabatlarni chiqarib olish ──
EXTRACTION_PROMPT = """
Quyidagi matndan entity va munosabatlarni JSON formatda chiqar.
Format:
{
"entities": [{"id": "apple", "type": "COMPANY", "name": "Apple Inc.", "props": {...}}],
"relationships": [{"from": "tim_cook", "to": "apple", "type": "CEO_OF", "props": {...}}]
}
Faqat JSON qaytargin, boshqa narsa yo'q.
Matn: {text}
"""
async def extract_entities(text: str) -> dict:
response = await client.messages.create(
model="claude-sonnet-4-6",
max_tokens=2000,
messages=[{"role": "user", "content": EXTRACTION_PROMPT.format(text=text)}]
)
import json
return json.loads(response.content[0].text)
async def index_document(doc_text: str, doc_id: str, neo4j_session):
"""Hujjatni graph'ga qo'shish"""
extracted = await extract_entities(doc_text)
# Neo4j'ga entity'lar qo'shish
for entity in extracted["entities"]:
await neo4j_session.run(
"MERGE (e:Entity {id: $id}) SET e += $props SET e:$type",
id=entity["id"], props=entity.get("props", {}), type=entity["type"]
)
# Munosabatlar qo'shish
for rel in extracted["relationships"]:
await neo4j_session.run(
"""
MATCH (a:Entity {id: $from_id}), (b:Entity {id: $to_id})
MERGE (a)-[r:RELATED {type: $rel_type}]->(b)
SET r += $props
""",
from_id=rel["from"], to_id=rel["to"],
rel_type=rel["type"], props=rel.get("props", {})
)
# ── 2. QUERY: Hybrid Graph + Vector retrieval ──
async def hybrid_graphrag_query(question: str, neo4j_session, qdrant) -> str:
# a) Savoldan entity'larni chiqar
entities_in_q = await extract_entities(question)
entity_ids = [e["id"] for e in entities_in_q.get("entities", [])]
# b) Graph traversal — entity'lardan bog'liq tugunlarni topish (2 hop)
graph_context = []
if entity_ids:
result = await neo4j_session.run(
"""
MATCH (e:Entity)-[r*1..2]-(related:Entity)
WHERE e.id IN $entity_ids
RETURN e, r, related LIMIT 50
""",
entity_ids=entity_ids
)
graph_context = [record.data() async for record in result]
# c) Vector search — semantic o'xshash chunk'lar
q_embedding = await get_embedding(question)
vector_results = await qdrant.search(
collection_name="documents",
query_vector=q_embedding,
limit=5
)
# d) Ikkalasini birlashtirish va LLM'ga berish
context = f"""
Graph Context (Munosabatlar):
{format_graph_context(graph_context)}
Vector Context (O'xshash hujjatlar):
{chr(10).join(r.payload['text'] for r in vector_results)}
"""
response = await client.messages.create(
model="claude-sonnet-4-6",
max_tokens=1500,
system="Siz berilgan context asosida aniq va to'liq javob beradigan yordamchisiz.",
messages=[{"role": "user", "content": f"Context:\n{context}\n\nSavol: {question}"}]
)
return response.content[0].text
Aspekt
Naive RAG
GraphRAG
Multi-hop reasoning
❌ Yo'q
✅ Graf traversal
Entity relationships
❌ Chunk'lar izolyatsiyada
✅ Explicit edges
Hallucination risk
Yuqori
Pastroq (traced sources)
Setup murakkabligi
Sodda
Yuqori (Neo4j + extraction)
Indexing narxi
Past
Yuqori (LLM call per doc)
Qachon tanlash
Single-hop QA, docs search
Complex relationships, compliance, supply chain
19.2
ai memory systems — short, long, episodic
AI agent qanday "eslab qoladi"? Memory architecture qanday quriladi?
javob
LLM stateless — u hech narsani eslamaydi
Har yangi API call — yangi boshlanish. LLM oldingi conversation'ni bilmaydi (context window'ga qo'shmasangiz). Bu agentic AI'ning eng katta muammosi: agent kun bo'yi ishlab, kechqurun sessiya yopilsa — ertasi kuni hammasi unutiladi. AI Memory — agentga "xotira" berish tizimi. Inson xotirasi kabi 3 qavatli: qisqa muddatli (session), uzoq muddatli (facts, preferences), epizodik (nimalar qilgani).
agent_memory.py
"""
Production AI Memory System — 4 xotira turi
"""
from qdrant_client import AsyncQdrantClient
from datetime import datetime
import json
class AgentMemory:
def __init__(self, agent_id: str):
self.agent_id = agent_id
self.qdrant = AsyncQdrantClient("localhost", port=6333)
self.short_term: list = [] # joriy session context
self.max_short_term = 20 # max 20 ta turn
# ── Short-term: conversation context ──
def add_to_short_term(self, role: str, content: str):
self.short_term.append({"role": role, "content": content})
# Eski xabarlarni o'chirish (token limit uchun)
if len(self.short_term) > self.max_short_term:
# Birinchi ikki (system) ni qoldirib, engeski'larni o'chiramiz
self.short_term = self.short_term[:2] + self.short_term[-self.max_short_term+2:]
def get_short_term_context(self) -> list:
return self.short_term.copy()
# ── Long-term: vector DB'da saqlash ──
async def remember(self, content: str, memory_type: str, metadata: dict = {}):
"""Muhim ma'lumotni uzoq muddatli xotiraga yozish"""
embedding = await get_embedding(content)
await self.qdrant.upsert(
collection_name="agent_memory",
points=[{
"id": generate_uuid(),
"vector": embedding,
"payload": {
"agent_id": self.agent_id,
"content": content,
"type": memory_type, # "fact", "preference", "skill"
"created_at": datetime.utcnow().isoformat(),
**metadata
}
}]
)
async def recall(self, query: str, memory_type: str = None, limit: int = 5) -> list:
"""Semantic qidirish — eng relevant xotiralarni qaytarish"""
q_embedding = await get_embedding(query)
filter_condition = {"must": [{"key": "agent_id", "match": {"value": self.agent_id}}]}
if memory_type:
filter_condition["must"].append({"key": "type", "match": {"value": memory_type}})
results = await self.qdrant.search(
collection_name="agent_memory",
query_vector=q_embedding,
query_filter=filter_condition,
limit=limit,
with_payload=True
)
return [r.payload for r in results]
# ── Episodic: nima qilganini yozish ──
async def log_action(self, action: str, result: str, success: bool):
"""Agent qilgan har bir ish yoziladi"""
await db.insert({
"agent_id": self.agent_id,
"action": action,
"result": result,
"success": success,
"timestamp": datetime.utcnow().isoformat()
})
# Xatoni uzoq muddatli xotiraga ham yozamiz (keyingi safar qaytarmaslik uchun)
if not success:
await self.remember(
f"Xato: {action} → {result}",
memory_type="error",
metadata={"avoid_repeat": True}
)
# ── Full context builder ──
async def build_context(self, current_query: str) -> str:
"""Agent uchun to'liq kontekst yig'ish"""
# 1. Relevant uzoq muddatli xotiralar
long_term = await self.recall(current_query, limit=3)
# 2. Relevant xatolar (bir xil xatoni qaytarmasin)
errors = await self.recall(current_query, memory_type="error", limit=2)
context_parts = []
if long_term:
context_parts.append("Oldingi tajribalarim:\n" +
"\n".join(f"- {m['content']}" for m in long_term))
if errors:
context_parts.append("Ilgari qilgan xatolar (takrorlamang):\n" +
"\n".join(f"- {e['content']}" for e in errors))
return "\n\n".join(context_parts) if context_parts else ""
# Ishlatish misoli:
memory = AgentMemory(agent_id="agent-001")
# Yangi ma'lumot o'rganildi
await memory.remember(
"Foydalanuvchi Python'ni afzal ko'radi, JavaScript'dan qochadi",
memory_type="preference"
)
# Vazifa bajarildi
await memory.log_action(
action="Write unit tests for auth module",
result="12 ta test yozildi, 11 ta pass, 1 ta fail (JWT expiry edge case)",
success=False
)
# Yangi savol keldi — contextni yig'amiz
context = await memory.build_context("Auth module'da yangi feature qo'shish kerak")
# context ichida: preference (Python), xato (JWT expiry) bor
19.3
slm, edge ai va mixture of experts (moe)
Hamma GPT-4 ishlatish shart emas — SLM va MoE qanday ishlaydi?
javob
Katta model = har doim yaxshi emas
2024-2025'da muhim o'zgarish: "bigger is better" dogmasi buzildi. Phi-4 (14B) — Microsoft SLM, GPT-4 darajasida benchmark'larda, lekin 10x kichik. Gemma 3 (4B) — Google, smartphone'da ishlaydi. Llama 3.2 (1B-3B) — edge device'larda. Nega muhim? Kichik model: privacy (data server'ga chiqmaydi), latency (local = sub-50ms), cost (API narxi yo'q), offline (internet kerak emas).
MoE — Mixture of Experts — GPT-4 va Mixtral'ning siri
Klassik transformer: har token barcha 70B parametrdan o'tadi. MoE g'oyasi: 8 ta "expert" model bor (har biri 7B), har token uchun faqat 2 ta eng relevant expert aktivlashadi. Natijada: 56B umumiy parametr, lekin har qadamda faqat 14B ishlatiladi. GPT-4 ham MoE arxitekturasi (rasmiy tasdiqlanmagan, lekin keng ishoniladi). Mixtral 8x7B — birinchi open-source MoE, 56B param lekin 14B "active". Inference narxi 4x past, accuracy yaxshi.
Model tanlash guide
Model
Params
Context
Use case
Narx (1M tok)
Claude Opus 4.6
~200B+
200K
Complex reasoning, agentic
$15 input/$75 output
Claude Sonnet 4.6
~70B
200K
Production workloads (best value)
$3/$15
GPT-4o mini
~8B MoE
128K
Simple tasks, high volume
$0.15/$0.60
Llama 3.3 (70B)
70B
128K
Self-hosted, privacy required
Infra narxi
Phi-4 (14B)
14B
16K
Edge, coding tasks, fast
Bepul (local)
Gemma 3 (4B)
4B
128K
Mobile, IoT, ultra-low latency
Bepul (local)
Mixtral 8x22B
141B/39B active
64K
Self-host, high quality, MoE
Infra narxi
Smart Model Router — 80% narxni tejash
Har savol ham GPT-4'ni talab qilmaydi
"Salom qanday?" — Phi-4 bilan javob bera oladi (0.001$). "Butun codebase'ni tahlil qilib bug topib ber" — Claude Opus kerak (0.50$). Savol murakkabligiga qarab model tanlash: classifier (kichik model) savolni baholaydi → routing qaroriga qarab mos model'ga yuboriladi. Production'da 80% savollar "easy" — kichik model bilan yopiladi, faqat 20% "hard" savollar katta modelga ketadi.
smart_router.py
from anthropic import AsyncAnthropic
from openai import AsyncOpenAI
import re
client = AsyncAnthropic()
openai_client = AsyncOpenAI()
DIFFICULTY_CLASSIFIER_PROMPT = """
Quyidagi savolning murakkabligini baholang. Faqat JSON qaytaring:
{
"difficulty": "easy" | "medium" | "hard",
"reasoning": "bir jumlada sabab",
"requires_code": true/false,
"requires_reasoning": true/false
}
Easy: oddiy savol, faktual, qisqa javob
Medium: tushuntirish kerak, bir necha qadam
Hard: kompleks reasoning, katta kontekst, kod tahlil, multi-step
Savol: {question}
"""
async def classify_difficulty(question: str) -> dict:
"""Kichik model bilan savolni baholash"""
import json
response = await openai_client.chat.completions.create(
model="gpt-4o-mini", # arzon classifier
messages=[{"role": "user",
"content": DIFFICULTY_CLASSIFIER_PROMPT.format(question=question)}],
max_tokens=100,
temperature=0
)
return json.loads(response.choices[0].message.content)
async def smart_route(question: str, context: str = "") -> tuple[str, str]:
"""
Returns: (answer, model_used)
"""
classification = await classify_difficulty(question)
difficulty = classification["difficulty"]
needs_reasoning = classification.get("requires_reasoning", False)
needs_code = classification.get("requires_code", False)
# Routing mantiq
if difficulty == "easy" and not needs_reasoning:
# Eng arzon: GPT-4o mini ($0.15/1M tokens)
model = "gpt-4o-mini"
response = await openai_client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": question}],
max_tokens=500
)
answer = response.choices[0].message.content
elif difficulty == "medium" or needs_code:
# O'rta narx: Claude Sonnet ($3/1M tokens)
model = "claude-sonnet-4-6"
response = await client.messages.create(
model=model,
max_tokens=1500,
messages=[{"role": "user", "content": question}]
)
answer = response.content[0].text
else: # hard yoki complex reasoning
# Eng qimmat: Claude Opus ($15/1M tokens) — faqat kerak bo'lganda
model = "claude-opus-4-6"
response = await client.messages.create(
model=model,
max_tokens=4000,
system="Siz ekspert muhandissiz. Kompleks muammolarni chuqur tahlil qiling.",
messages=[{"role": "user", "content": f"{context}\n\n{question}" if context else question}]
)
answer = response.content[0].text
# Cost tracking (Langfuse yoki custom)
await track_usage(model=model, tokens_used=estimate_tokens(question + answer),
difficulty=difficulty)
return answer, model
# Natijalar:
# 80% savollar → gpt-4o-mini (0.15$/1M)
# 15% → claude-sonnet (3$/1M)
# 5% → claude-opus (15$/1M)
# O'rtacha narx: ~0.6$/1M (opus-only vs 15x tejash)
◈
AI engineering asosiy tamoyili
Har savol uchun eng katta modalni ishlatish — pul yo'qotish. Smart routing + caching + SLM-for-simple-tasks kombinatsiyasi bilan 80-90% narx tejash mumkin, foydalanuvchi sifati yo'qolmaydi. Bu eng muhim LLMOps pattern'i.
Part VIII
software architecture patterns
Design patterns, microservices, trade-off'lar va muhandislik tamoyillari. Kod yozish — muhandislikning kichik bir qismi. Katta tizimlarni loyihalash, refactor qilish, jamoa bilan ishlash — bular professional muhandisning asosiy mahorati.
20
design patterns — production'da ishlatiluvchi
20.1
creational patterns — ob'ekt yaratish
Singleton, Factory, Builder — qachon va qanday ishlatiladi?
javob
Design pattern nima va nima uchun kerak?
Design pattern — takror uchraidigan muammolarga isbotlangan yechimlar. Ixtiro qilish shart emas — 1994-yilda "Gang of Four" 23 ta klassik pattern'ni hujjatlashtirishdi. Bularni bilish: boshqalar yozgan kodni tushunish (masalan, "bu Factory pattern"), muloqotni qisqartirish ("Singleton ishlataylik" — uzun tushuntirish kerak emas), yaxshi arxitektura tanlash.
Singleton — bitta nusxa
Singleton qachon kerak?
Tizimda faqat bitta nusxa bo'lishi kerak bo'lgan narsalar: DB connection pool, konfigurasiya, logging instance. Python'da oddiy — module-level variable avtomatik singleton. Thread-safe singleton uchun esa ehtiyotkorlik kerak.
patterns/singleton.py
from functools import lru_cache
from threading import Lock
# ── Python'da eng sodda Singleton — module import ──
# config.py
class _Config:
def __init__(self):
self.db_url = "postgres://..."
self.debug = False
_instance = _Config() # Module import'da bir marta yaratiladi
def get_config() -> _Config:
return _instance # Har doim bir xil obyekt
# ── Thread-safe Singleton (generic) ──
class Singleton:
_instance = None
_lock: Lock = Lock()
def __new__(cls):
with cls._lock:
if not cls._instance:
cls._instance = super().__new__(cls)
return cls._instance
# ── FastAPI'da: lru_cache bilan ──
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
db_url: str
api_key: str
@lru_cache # Bir marta ishlaydi, natijani cache'laydi = Singleton
def get_settings() -> Settings:
return Settings()
# ── Singleton anti-pattern: global state testing'ni qiyinlashtiradi ──
# Testing uchun yaxshiroq: Dependency Injection
# class MyService:
# def __init__(self, config: Config): # inject
# self.config = config
"Qaysi konkret sinf kerak" qarorini foydalanuvchidan yashirish. Masalan: payment provider tanlash — Stripe, Payme, Click — har biri alohida implementatsiya, lekin bir xil interfeys. Client faqat PaymentFactory.create("payme") deydi, ichida qaysi sinfni ishlatishni Factory hal qiladi.
from dataclasses import dataclass, field
from typing import Self
# ── SQL Query Builder ──
class QueryBuilder:
"""Method chaining bilan SQL query yaratish"""
def __init__(self, table: str):
self._table = table
self._conditions: list[str] = []
self._columns: list[str] = ["*"]
self._limit: int | None = None
self._offset: int = 0
self._order: str | None = None
self._params: list = []
def select(self, *columns: str) -> Self:
self._columns = list(columns)
return self
def where(self, condition: str, *values) -> Self:
param_start = len(self._params) + 1
self._conditions.append(
condition.replace("?", f"${param_start}")
)
self._params.extend(values)
return self
def limit(self, n: int) -> Self:
self._limit = n
return self
def offset(self, n: int) -> Self:
self._offset = n
return self
def order_by(self, column: str, direction: str = "ASC") -> Self:
self._order = f"{column} {direction}"
return self
def build(self) -> tuple[str, list]:
cols = ", ".join(self._columns)
sql = f"SELECT {cols} FROM {self._table}"
if self._conditions:
sql += " WHERE " + " AND ".join(self._conditions)
if self._order:
sql += f" ORDER BY {self._order}"
if self._limit:
sql += f" LIMIT {self._limit}"
if self._offset:
sql += f" OFFSET {self._offset}"
return sql, self._params
# ── Ishlatish ──
query, params = (
QueryBuilder("orders")
.select("id", "status", "total")
.where("tenant_id = ?", tenant_id)
.where("status = ?", "active")
.order_by("created_at", "DESC")
.limit(20)
.offset(40)
.build()
)
# SELECT id, status, total FROM orders
# WHERE tenant_id = $1 AND status = $2
# ORDER BY created_at DESC LIMIT 20 OFFSET 40
20.2
structural & behavioral patterns — zamonaviy
Repository, Strategy, Observer, Decorator — backend'da qanday qo'llanadi?
javob
Repository Pattern — DB logikasini ajratish
Repository nima uchun muhim?
Service layer to'g'ridan-to'g'ri DB bilan gapirsa — test yozish qiyin (real DB kerak), DB o'zgarsa (PostgreSQL → MongoDB) — butun service qayta yoziladi. Repository — ma'lumotlar bilan ishlash logikasini alohida qatlam. Service faqat nima istashini aytadi, Repository qanday olishni biladi.
patterns/repository.py
from abc import ABC, abstractmethod
# ── Abstract interface ──
class OrderRepository(ABC):
@abstractmethod
async def get_by_id(self, order_id: str) -> Order | None: ...
@abstractmethod
async def get_by_tenant(self, tenant_id: str, limit: int, offset: int) -> list[Order]: ...
@abstractmethod
async def create(self, data: OrderCreate, user_id: str) -> Order: ...
@abstractmethod
async def update_status(self, order_id: str, status: str) -> Order: ...
# ── PostgreSQL implementatsiyasi ──
class PostgresOrderRepository(OrderRepository):
def __init__(self, db):
self.db = db
async def get_by_id(self, order_id: str) -> Order | None:
row = await self.db.fetchrow(
"SELECT * FROM orders WHERE id = $1", order_id
)
return Order(**dict(row)) if row else None
async def get_by_tenant(self, tenant_id, limit, offset) -> list[Order]:
rows = await self.db.fetch(
"""SELECT * FROM orders WHERE tenant_id = $1
ORDER BY created_at DESC LIMIT $2 OFFSET $3""",
tenant_id, limit, offset
)
return [Order(**dict(r)) for r in rows]
async def create(self, data: OrderCreate, user_id: str) -> Order:
row = await self.db.fetchrow(
"""INSERT INTO orders (id, user_id, product_id, quantity, status)
VALUES (gen_random_uuid(), $1, $2, $3, 'pending')
RETURNING *""",
user_id, data.product_id, data.quantity
)
return Order(**dict(row))
async def update_status(self, order_id: str, status: str) -> Order:
row = await self.db.fetchrow(
"UPDATE orders SET status=$1, updated_at=NOW() WHERE id=$2 RETURNING *",
status, order_id
)
return Order(**dict(row))
# ── In-memory implementatsiyasi (testing uchun!) ──
class InMemoryOrderRepository(OrderRepository):
def __init__(self):
self._store: dict[str, Order] = {}
async def get_by_id(self, order_id: str) -> Order | None:
return self._store.get(order_id)
async def create(self, data: OrderCreate, user_id: str) -> Order:
order = Order(id=str(uuid.uuid4()), user_id=user_id, **data.dict())
self._store[order.id] = order
return order
# ── Service — Repository inject qilinadi ──
class OrderService:
def __init__(self, repo: OrderRepository): # Abstract type!
self.repo = repo
async def create_order(self, data: OrderCreate, user: User) -> Order:
# Biznes logikasi faqat shu yerda
if data.quantity > 100:
raise ValueError("Maximum 100 ta buyurtma")
order = await self.repo.create(data, user.id)
await self.notify_warehouse(order)
return order
# Dependency injection:
# Production: OrderService(PostgresOrderRepository(db))
# Test: OrderService(InMemoryOrderRepository())
Monolith qachon microservices'ga aylantiriladi va qanday bo'linadi?
javob
Eng ko'p qilinadigan xato — erta microservices
Deyarli hamma startup microservices bilan boshlaydi — bu xato. Microservices tarqatilgan tizim murakkabliklarini olib keladi: service discovery, network latency, distributed tracing, data consistency. 10 ta developer'gacha bo'lgan jamoa uchun yaxshi arxitekturalangan monolith ko'pincha yaxshiroq. Amazon'ning o'zi ham 2002-yilda monolith'da boshlagan, keyinroq bo'lingan.
Qachon bo'lish kerak: jamoa 50+ bo'lganda (Conway's Law), bir qism alohida scale qilish kerak bo'lganda (masalan, ML inference 100x ko'proq resurs kerak), deploy tezligini oshirish uchun (independent deployment), texnologiya farqi kerak bo'lganda (Go service + Python ML).
Microservices decomposition — qanday bo'linadi?
Domain-Driven Design (DDD) — to'g'ri bo'linish usuli
Texnik chiziqlar bo'yicha emas (database, UI, API) — biznes domeniga ko'ra bo'lish. Bounded Context: "Orders" jarayoni — buyurtma yaratish, kuzatish, bekor qilish. Bu bitta service. "Payments" — to'lov, qaytarish, cheklar. Bu alohida service. Ular bir-birini Kafka event orqali xabardor qiladi, to'g'ridan-to'g'ri DB'ga kirmaydi.
Bo'linish tamoyili
Izoh
Misol
Business capability
Biznes funksiyasiga ko'ra
Orders, Payments, Inventory, Auth
Domain
DDD Bounded Context
Customer, Catalog, Fulfillment
Team ownership
Conway's Law — bir jamoa = bir service
Backend team → core API, ML team → inference
Scale requirement
Alohida scale kerak bo'lgan
ML inference → GPU pods, API → CPU pods
Technology fit
Har service o'z stack'ida
Go auth (tez), Python ML, Node realtime
Service communication patterns
service_communication.py
"""
Microservices o'rtasida kommunikatsiya:
1. Sync REST/gRPC — darhol javob kerak
2. Async Kafka — eventual consistency OK
3. Service mesh — mTLS, retry, circuit breaker
"""
# ── 1. Sync gRPC — internal service call ──
import grpc
from proto import orders_pb2, orders_pb2_grpc
async def get_order_status(order_id: str) -> str:
"""Sync call — javob kerak"""
async with grpc.aio.insecure_channel("orders-service:50051") as channel:
stub = orders_pb2_grpc.OrdersStub(channel)
response = await stub.GetOrder(
orders_pb2.GetOrderRequest(order_id=order_id),
timeout=2.0 # 2 sekund timeout
)
return response.status
# ── 2. Circuit Breaker ──
from circuitbreaker import circuit
@circuit(failure_threshold=5, recovery_timeout=30)
async def call_payment_service(order_id: str, amount: float):
"""5 marta fail bo'lsa — 30 sek "open" holatda"""
async with httpx.AsyncClient() as client:
response = await client.post(
"http://payment-service/charge",
json={"order_id": order_id, "amount": amount},
timeout=5.0
)
response.raise_for_status()
return response.json()
# ── 3. Async via Kafka ──
async def process_order_completed(order: Order):
"""Order tugagach — async fan-out"""
# Payment service'ni async xabardor qilish
await kafka.send("order.completed", {
"order_id": str(order.id),
"user_id": str(order.user_id),
"amount": float(order.total),
"items": [{"product_id": str(i.product_id), "qty": i.quantity}
for i in order.items]
})
# Har service o'z tezligida o'qiydi:
# - notification-service: email yuboradi
# - inventory-service: stokni kamaytiradi
# - analytics-service: statistika yangilaydi
# ── 4. Saga — distributed transaction ──
class OrderSaga:
"""Kompensatsiya bilan distributed transaction"""
steps = []
compensations = []
async def execute(self, order_data):
try:
# Step 1: Payment
payment = await payment_service.charge(order_data)
self.compensations.append(
lambda: payment_service.refund(payment.id)
)
# Step 2: Inventory
reservation = await inventory_service.reserve(order_data)
self.compensations.append(
lambda: inventory_service.release(reservation.id)
)
# Step 3: Order create
order = await order_service.create(order_data, payment.id)
return order
except Exception as e:
# Ro'y bergan qadam'gacha kompensatsiya
for compensate in reversed(self.compensations):
await compensate()
raise
21.2
api gateway, service mesh, observability
Microservices infratuzilmasi qanday boshqariladi — service discovery, tracing, health?
javob
Microservices "vertikal" muammolari
Har service uchun alohida: auth middleware, rate limiting, logging, retry logic, TLS — bu cross-cutting concerns. Har service'da takrorlash — nightmare. Yechim: API Gateway (tashqi traffic) va Service Mesh (ichki traffic). Gateway: bitta kirish nuqtasi — routing, auth, rate limit. Service Mesh (Istio, Linkerd): sidecar proxy pattern — har pod'ga proxy, service'lar o'rtasidagi traffic nazorat ostida.
Distributed tracing — request path kuzatish
tracing_setup.py
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.asyncpg import AsyncPGInstrumentor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
def setup_tracing(app: FastAPI, service_name: str):
"""OpenTelemetry — auto-instrumentation + manual spans"""
# Exporter — Jaeger yoki Tempo'ga yuborish
exporter = OTLPSpanExporter(endpoint="http://jaeger:4317")
provider = TracerProvider()
provider.add_span_processor(BatchSpanProcessor(exporter))
trace.set_tracer_provider(provider)
# Auto-instrumentation — FastAPI, asyncpg, httpx
FastAPIInstrumentor.instrument_app(app)
AsyncPGInstrumentor().instrument()
HTTPXClientInstrumentor().instrument()
# ── Manual span — muhim funksiya uchun ──
tracer = trace.get_tracer(__name__)
async def process_order(order_id: str):
with tracer.start_as_current_span("process_order") as span:
span.set_attribute("order.id", order_id)
# DB span avtomatik qo'shiladi (asyncpg instrumentation)
order = await db.fetchrow("SELECT * FROM orders WHERE id = $1", order_id)
with tracer.start_as_current_span("validate_inventory"):
is_available = await check_inventory(order["product_id"])
span.set_attribute("inventory.available", is_available)
if not is_available:
span.set_attribute("order.cancelled", True)
span.set_status(trace.Status(trace.StatusCode.ERROR))
raise OutOfStockError(order["product_id"])
return order
# Jaeger'da ko'rish:
# /api/v1/orders/create → [5ms]
# → Auth middleware [1ms]
# → process_order [12ms]
# → DB query [4ms]
# → validate_inventory [6ms]
# → inventory-service HTTP call [5ms]
ⓘ
Microservices anti-patterns — qochish kerak
Distributed monolith: service'lar alohida deploy qilinadi, lekin bir-biriga sinxron bog'liq. Bitta tushsa — hamma tushadi. Shared database: bir nechta service bitta DB'ga yozsa — o'zaro to'sib qo'yadi, migrate qilish imkonsiz. Nano-services: har funksiya alohida service — network overhead gigantik. Versioning yo'q: service API'sini versiyalamay o'zgartirish — consumer'lar sinadi.