~/backend_ai_architecture status: online
BACKEND & AI ARXITEKTURA
sys:backend_ai lang:uzbek mode:production
author:@komiljon_xamidjonov lang:uz scope:all-in-one

Distributed systems asoslaridan LLM, RAG va agentic AI'gacha — production darajasidagi backend tizimlarni qurish uchun to'liq qo'llanma. Real loyihalardan olingan kod namunalari, interaktiv diagrammalar va chuqur tushuntirishlar.

Part I

distributed systems foundations

Barcha zamonaviy backend architecture shu bo'limdan boshlanadi: tenant, SaaS, auth, API design, caching. Bular — keyingi hamma narsaning poydevori. AI tizim ham shu qoidalar bilan yashaydi.

01

core concepts & terminology

1.1

tenant, saas, multi-tenancy — to'liq manzara

Tenant, SaaS, multi-tenancy, B2B/B2C — bular qanday bog'liq va qanday ishlaydi?
javob
Oddiy tushuntirish

Tenant = "ijarachi". Bino egasi siz — ko'p kompaniyalarga ijaraga berasiz. Har kompaniya o'z ofisini (data'sini) ko'radi, boshqalarnikini emas.

SaaS = Software as a Service. Brauzerda ochib ishlatiladigan ilova (Gmail, Slack, Notion). Mijoz hech narsa o'rnatmaydi.

Multi-tenant SaaS = bitta ilova, ming mijoz. Har biri alohida workspace.

SaaS arxitekturasining 4 asosiy komponenti

Tenant A users (company.com) Tenant B users (acme.com) Tenant C users (startup.io) CONTROL PLANE billing · admin · signup tenant management DATA PLANE application runtime (sizdagi FastAPI/Django) tenant_id majburiy! metadata DB tenants, users, billing tenant DBs isolated data per tenant strategy shared services cache · queue · storage obs per-tenant metrics Har tenant data plane'ga kiradi — lekin faqat o'z data'sini ko'radi. Control plane tenant'larni boshqaradi.

Multi-tenancy izolyatsiya — 3 ta asosiy strategiya

Strategiya 1 — Database-per-tenant

Har tenant uchun alohida DB instance. Eng yuqori izolyatsiya, lekin eng qimmat operatsion jihatdan.

db_router.py
from sqlalchemy.ext.asyncio import create_async_engine

class TenantDBRouter:
    """Har tenant uchun alohida connection"""
    def __init__(self):
        self._engines: dict[str, AsyncEngine] = {}
    
    async def get_engine(self, tenant_id: str) -> AsyncEngine:
        if tenant_id not in self._engines:
            # Metadata DB'dan tenant connection string olish
            conn_str = await self._resolve_tenant_db(tenant_id)
            self._engines[tenant_id] = create_async_engine(
                conn_str, pool_size=5, max_overflow=10
            )
        return self._engines[tenant_id]
    
    async def _resolve_tenant_db(self, tenant_id: str) -> str:
        # metadata DB'dan lookup
        tenant = await metadata_db.fetch_one(
            "SELECT db_host, db_name FROM tenants WHERE id = $1", tenant_id
        )
        return f"postgresql+asyncpg://{tenant.db_host}/{tenant.db_name}"

Qachon ishlating: regulyatsiya talablari (HIPAA, bank), 50 tadan kam katta enterprise mijoz, har mijoz uchun alohida backup/restore kerak.

Strategiya 2 — Schema-per-tenant

Bitta PostgreSQL instance, har tenant uchun alohida schema (jadvallar guruhi).

schema_strategy.sql
-- Har tenant uchun schema
CREATE SCHEMA tenant_acme;
CREATE SCHEMA tenant_globex;

-- Har schema'da jadvallar
CREATE TABLE tenant_acme.orders (id UUID PRIMARY KEY, amount DECIMAL);
CREATE TABLE tenant_globex.orders (id UUID PRIMARY KEY, amount DECIMAL);

-- Query vaqtida search_path
SET search_path TO tenant_acme;
SELECT * FROM orders;  -- avtomatik tenant_acme.orders

Afzallik: bitta DB — bitta backup. Kamchilik: 10,000+ schema'da PostgreSQL sekinlashadi, migrations har schema uchun alohida yugurtirish kerak.

Strategiya 3 — Shared schema with tenant_id

Bitta DB, bitta schema, har jadvalda tenant_id ustun. Eng ko'p ishlatiladigan yondashuv.

shared_schema.sql
CREATE TABLE orders (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    tenant_id UUID NOT NULL,
    user_id UUID NOT NULL,
    amount DECIMAL(10,2) NOT NULL,
    status TEXT NOT NULL,
    created_at TIMESTAMPTZ DEFAULT NOW(),
    -- MUHIM: har composite index tenant_id dan boshlanadi
    CONSTRAINT fk_tenant FOREIGN KEY (tenant_id) REFERENCES tenants(id)
);

-- tenant_id doimo birinchi!
CREATE INDEX idx_orders_tenant_status 
    ON orders(tenant_id, status) 
    WHERE status != 'archived';

CREATE INDEX idx_orders_tenant_user_created 
    ON orders(tenant_id, user_id, created_at DESC);

-- RLS — oxirgi himoya qatlami
ALTER TABLE orders ENABLE ROW LEVEL SECURITY;
CREATE POLICY tenant_isolation ON orders
    USING (tenant_id = current_setting('app.tenant_id')::uuid);

Tanlash matritsasi — real dunyo

MezonDB-per-tenantSchema-per-tenantShared schema
IzolyatsiyaEng yuqoriYuqoriO'rtacha (RLS bilan)
Cost per tenant$$$$$$
Scale limit~100 tenant~1000 schemaMillions
Noisy neighborYo'qKamYuqori (mitigation kerak)
MigrationHar DBHar schemaBitta run
Cross-tenant analyticsQiyinO'rtachaOson
Use caseHealthcare, BankB2B SMBConsumer SaaS
Trend — hybrid approach

Modern SaaS tendentsiyasi: shared schema by default, lekin premium enterprise mijozlarga dedicated DB. Bu "silver tier" arxitekturasi — standart mijozlar uchun arzon, enterprise uchun to'liq izolyatsiya. Snowflake, Databricks, Notion shunday yo'l tutadi.

1.2

performance metrics — rps, latency, percentiles

RPS, p50, p95, p99, SLA, SLO — bularni to'g'ri o'lchash va interpretatsiya qilish?
javob
Nima uchun bu metrikalar muhim?

Backend muhandis sifatida, tizimingiz qanchalik yaxshi ishlayotganini o'lchash uchun uchta asosiy savol bor: Nechta so'rov kelmoqda? (throughput), Qancha vaqt ketmoqda? (latency), Qanchasi xato bo'lmoqda? (error rate). Bu metrikalar yo'q bo'lsa, muammoni ko'z yumib topishga harakat qilasiz — ishlashi mumkin, lekin ko'r bo'lib qolasiz.

RPS va throughput — hajm o'lchami

RPS (Requests Per Second) — sekundiga qancha so'rov keladi. TPS (Transactions Per Second) — DB transaction soni. Scale haqida gapirganda birinchi savol: "qancha RPS kerak?"

Real dunyo misollari (peak RPS): ───────────────────────────────── Small startup MVP ~10 RPS Growing B2B SaaS ~100-500 RPS Popular consumer app ~1k-10k RPS Twitter/X peak ~500k RPS Google search ~100k RPS average, millions peak

Latency va percentile'lar — tajriba o'lchami

Latency — bitta so'rovga qancha vaqt sarflanadi. Lekin average (o'rtacha) deyarli hech nima aytmaydi, percentile muhim.

Percentile — bu nima?

100 ta so'rovni tezlik bo'yicha tartiblab qo'ying. p99 = 99-chi o'rinda turgan so'rov vaqti. Ya'ni so'rovlarning 99% undan tez, 1% sekinroq.

100 ta so'rov, sortlangan (ms): [10, 12, 15, 18, 20, ..., 50, ..., 200, 300, 800, 3000] ↑ ↑ ↑ p50 p95 p99 so'rovlarning yarmi so'rovlar eng yomon 50ms dan tez 95% < 200ms 1% — 3000ms
Average — yolg'onchi metric

100 ta so'rov: 99 tasi 50ms, 1 tasi 5000ms bo'lsa — average = 99.5ms (normal ko'rinadi!), lekin p99 = 5000ms (real hayotda kimdir 5 sekund kutmoqda). Har doim percentile bilan ishlang.

Percentile'lar dashboard'da qanday ko'rinadi
latency_tracking.py
from prometheus_client import Histogram
import time

# Histogram — distribution'ni saqlaydi, percentile hisoblash uchun
http_duration = Histogram(
    'http_request_duration_seconds',
    'HTTP request latency',
    ['method', 'endpoint', 'status'],
    # bucket'lar — bu oraliqlarda counter bo'ladi
    buckets=[0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]
)

@app.middleware("http")
async def track_latency(request, call_next):
    start = time.perf_counter()
    response = await call_next(request)
    duration = time.perf_counter() - start
    
    http_duration.labels(
        method=request.method,
        endpoint=request.url.path,
        status=response.status_code
    ).observe(duration)
    
    return response

Grafana'da PromQL query:

promql
# p99 oxirgi 5 daqiqa ichida
histogram_quantile(0.99, 
  sum(rate(http_request_duration_seconds_bucket[5m])) by (le)
)

# Per-endpoint p95
histogram_quantile(0.95,
  sum(rate(http_request_duration_seconds_bucket[5m])) by (le, endpoint)
)

# RPS
sum(rate(http_request_duration_seconds_count[1m]))

SLA, SLO, SLI — reliability targeting

Bu uchlik — reliability tili

Ko'pchilik "uptime 99.9%" deydi va shunda qoladi. Ammo professional tizimda uchta alohida tushuncha bor: SLI — siz haqiqatda o'lchaydigan narsa (masalan, "so'rovlarning 99.2% 500ms'dan tez bo'ldi"). SLO — sizning ichki maqsadingiz ("99% qo'shimcha 400ms"). SLA — mijoz bilan rasmiy shartnoma ("99.5% yoki pul qaytariladi"). SLO har doim SLA'dan qattiqroq bo'ladi — shunda SLA'ni buzmasdan oldin ogohlantirishasiz.

SLI Service Level Indicator o'lchov (ma'lumot) "p99 latency = 450ms" SLO Service Level Objective maqsad (ichki) "p99 < 500ms, 99.9%" SLA Service Level Agreement shartnoma (tashqi) "99.5% uptime yoki refund" SLI o'lchaymiz → SLO ga solishtiramiz → SLA'ga qancha marja qoldi?
Error Budget — SRE sir qurolligi

Agar SLO = 99.9% uptime (oyda 30 kun) bo'lsa, sizda 43 daqiqa 49 sekund downtime budjeti bor har oy. Bu — "error budget". Uni aqlli sarflash kerak.

SLO %Year downtimeMonth downtimeWeek downtime
99.9%8h 45m43m 49s10m 4s
99.95%4h 22m21m 54s5m 2s
99.99% (4 nines)52m 35s4m 22s1m 0s
99.999% (5 nines)5m 15s26.3s6s
5 nines — juda qimmat

Har qo'shimcha "nine" uchun cost ~10x oshadi. Bank core system 99.999% kerak bo'lishi mumkin, lekin consumer SaaS uchun 99.9% yetarli. Aqlli biznes qarori — "error budget'ni feature'larga sarflash" (tez yangi feature chiqarish, kam stabillik) yoki stabillikka sarflash.

Boshqa muhim metric'lar

  • Apdex Score — "User satisfaction". 0-1 oralig'ida. 0.9+ yaxshi.
  • Time to First Byte (TTFB) — server javob berishni boshlash vaqti.
  • Saturation — resurs to'la qolganligini ko'rsatadi (CPU queue, DB connection queue).
  • Error rate — 5xx error'lar foizi. >1% bo'lsa alarm.
  • DAU/MAU stickiness — 0.5+ juda yaxshi, 0.2+ qabul qilinadigan.
1.3

noisy neighbor — multi-tenant performance isolation

Bitta tenant butun tizimni sekinlashtirayapti — qanday yechimlar bor?
javob
Muammo

100 ta tenant bitta PostgreSQL'ga ulangan. Mijoz #47 har soatda 50 million qator export qiladi. Bu vaqtda DB CPU 100% ga chiqadi, qolgan 99 tenant ham sekinlashadi. Hamma shikoyat qiladi — lekin sabab faqat bitta tenant.

5 qatlamli mitigation strategy

Nima uchun 5 qatlam?

Har qatlam muammoning turli jihatini hal qiladi. Ular bir-birini to'ldiradi — faqat bittasini qo'llash yetarli emas. L1 request sonini cheklaydi, L2 database ulanishlarini, L3 og'ir vazifalarni ajratadi, L4 fizik izolyatsiya beradi, L5 eng katta tenantlarga to'liq egalik beradi. Request kelganida yuqoridan pastga har qatlamdan o'tadi — agar limit oshsa, o'sha qatlamda to'xtatiladi.

↓ TENANT REQUEST bitta tenant → 5 qatlamdan o'tishi kerak L1 · Rate Limiter Token bucket · Redis Lua · per-tenant 429 Too Many passed L2 · Conn Pool Quota PgBouncer · tenant uchun max N ulanish queue'da kutish L3 · Queue Isolation Og'ir task → slow lane · tez task → fast parallel workers L4 · Tenant Sharding tenant_id → shard A/B/C · fizik izolyatsiya shard A / B / C L5 · Dedicated Infra Top 5% tenant → alohida cluster (Enterprise) VIP cluster ✓ RESPONSE REQUEST FLOW ↓

L1 — Token bucket rate limiter (Redis)

Token bucket — kova analogiyasi

Tasavvur qiling: har tenant uchun alohida kova bor. Kova vaqt o'tishi bilan avtomatik to'ladi (masalan, sekundiga 10 ta token). Har so'rov 1 token sarflaydi. Kova bo'sh bo'lsa — so'rov rad etiladi (429). Burst ruxsati bor — kova maksimum 20 token sig'diradi, ya'ni bir onda 20 ta so'rov ketishi mumkin, keyin kova to'lguncha kutish kerak.

Nima uchun Redis Lua script? Chunki "tokenni tekshir va o'chir" operatsiyasi atomik bo'lishi kerak — aks holda race condition: ikki parallel so'rov bir vaqtda "10 token bor" ko'rib ikkisi ham o'tib ketadi.

Bitta tenant uchun sekundiga X so'rov, burst Y gacha ruxsat. Lua script bilan atomic operation:

token_bucket.py
TOKEN_BUCKET_LUA = """
local key = KEYS[1]
local rate = tonumber(ARGV[1])         -- tokens per second
local capacity = tonumber(ARGV[2])     -- bucket size
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])

local bucket = redis.call('HMGET', key, 'tokens', 'last')
local tokens = tonumber(bucket[1]) or capacity
local last = tonumber(bucket[2]) or now

-- tokens to'ldirish
local elapsed = math.max(0, now - last)
tokens = math.min(capacity, tokens + elapsed * rate)

if tokens < requested then
    return {0, tokens}  -- rad etish
end

tokens = tokens - requested
redis.call('HMSET', key, 'tokens', tokens, 'last', now)
redis.call('EXPIRE', key, 3600)
return {1, tokens}
"""

class TenantRateLimiter:
    def __init__(self, redis_client):
        self.redis = redis_client
        self.script = redis_client.register_script(TOKEN_BUCKET_LUA)
    
    async def check(self, tenant_id: str, cost: int = 1) -> tuple[bool, int]:
        # Tenant tier'iga qarab limit
        limits = TIER_LIMITS[tenant_tier(tenant_id)]
        
        allowed, remaining = await self.script(
            keys=[f"rl:{tenant_id}"],
            args=[limits.rate, limits.burst, time.time(), cost]
        )
        return bool(allowed), int(remaining)

TIER_LIMITS = {
    "free":       Limits(rate=10,  burst=20),    # 10 req/sec
    "pro":        Limits(rate=100, burst=200),
    "enterprise": Limits(rate=1000, burst=2000),
}

L3 — Heavy workload queue isolation (Celery)

Tezkor va og'ir vazifalarni alohida worker pool'larda yugurtirish — eng ta'sirli texnika.

celery_queues.py
from celery import Celery

app = Celery('myapp', broker='redis://localhost:6379/0')

app.conf.task_routes = {
    'app.tasks.send_email':    {'queue': 'fast'},
    'app.tasks.process_order': {'queue': 'fast'},
    'app.tasks.export_report': {'queue': 'heavy'},
    'app.tasks.ml_inference':  {'queue': 'gpu'},
}

# Per-tenant rate limit
@app.task(bind=True, rate_limit='100/m')
def send_email(self, tenant_id, to, subject):
    ...

# Og'ir vazifalar
@app.task(bind=True, rate_limit='5/m', soft_time_limit=600)
def export_report(self, tenant_id, report_id):
    ...

Worker konfiguratsiyasi — har queue uchun alohida pool:

docker-compose.yml
services:
  worker-fast:
    image: myapp
    command: celery -A app worker -Q fast --concurrency=20 --max-tasks-per-child=1000
    deploy:
      replicas: 3

  worker-heavy:
    image: myapp
    command: celery -A app worker -Q heavy --concurrency=4 --max-tasks-per-child=100
    deploy:
      replicas: 2
      resources:
        limits:
          memory: 4G
          cpus: '2.0'

  worker-gpu:
    image: myapp-ml
    command: celery -A app worker -Q gpu --concurrency=1
    deploy:
      resources:
        reservations:
          devices:
            - capabilities: [gpu]
Production tip

Big tenant export bilan fast queue'ni zaharlab qo'ymaydi. Natijada: user action'lar (login, order) har doim tez bajariladi, og'ir ishlar sekinroq (lekin bu kutiluvchi).

02

auth & access control

2.1

authentication — session, jwt, oauth2, passkeys

2026-yilda qanday auth texnikalari mavjud va qaysisi qachon ishlatiladi?
javob

Auth landscape — qaysi texnikani tanlash

Session-based auth (klassik, hali ham yashaydi)

Session auth qanday ishlaydi?

Foydalanuvchi login qilganda server uning ma'lumotlarini server tomonida (Redis'da) saqlaydi va brauzerga faqat bir ID raqam beradi — xuddi mehmonxona karta raqami kabi. Keyingi har so'rovda browser shu ID'ni yuboradi, server Redis'ga qarab "bu kim?" deb tekshiradi.

Afzalligi: Logout juda oddiy — Redis'dan o'chirasiz. Hamma sessilarni o'chirish mumkin (masalan, "barcha qurilmalardan chiq"). Kamchiligi: Har so'rovda Redis'ga murojaat kerak, horizontal scaling'da sticky sessions muammosi yoki centralized Redis kerak.

session_auth.py
from fastapi import Response, Cookie, HTTPException
import secrets

SESSION_TTL = 3600 * 24 * 7  # 7 kun

async def login(email: str, password: str, response: Response):
    user = await verify_credentials(email, password)
    if not user:
        raise HTTPException(401, "Invalid credentials")
    
    # Session ID yaratish
    sid = secrets.token_urlsafe(32)
    await redis.setex(
        f"session:{sid}",
        SESSION_TTL,
        json.dumps({
            "user_id": str(user.id),
            "tenant_id": str(user.tenant_id),
            "created_at": time.time(),
            "ip": request.client.host,
            "user_agent": request.headers.get("user-agent", "")
        })
    )
    
    # Secure cookie
    response.set_cookie(
        key="sid",
        value=sid,
        max_age=SESSION_TTL,
        httponly=True,    # JS'dan o'qib bo'lmaydi (XSS himoya)
        secure=True,      # faqat HTTPS
        samesite="lax",   # CSRF himoya
    )
    return {"status": "logged_in"}

async def get_current_user(sid: str = Cookie(None)):
    if not sid:
        raise HTTPException(401)
    data = await redis.get(f"session:{sid}")
    if not data:
        raise HTTPException(401)
    return User(**json.loads(data))

async def logout(sid: str = Cookie(None)):
    await redis.delete(f"session:{sid}")

JWT — stateless, scalable

JWT — paport analogiyasi

Session-based auth — bu mehmonxona kartasi: har safar ishlatilganda hotel tizimiga murojaat qilinadi "bu karta haqiqiymi?". JWT esa — davlat muhri bosilgan paport: chegara nazoratchi har safar davlatning bazasiga murojaat qilmaydi, faqat muhrni tekshiradi. Agar muhur to'g'ri bo'lsa — o'tkaziladi.

Natijada: 10 ta microservice bo'lsa, JWT ularning har birida mustaqil verify qilinadi — markaziy auth serverga murojaat yo'q. Bu katta traffic'da katta afzallik.

JWT ning katta muammosi — revoke qilish

Paportni yo'qotsangiz, u keyingi 10 yil davomida amal qiladi. JWT ham xuddi shunday — token yaratilgandan keyin uni "bekor qilish" yo'li yo'q (blacklist'siz). Shuning uchun production'da: access token muddati qisqa (15 min) + refresh token (30 kun, Redis'da saqlanadi). Refresh tokenni revoke qilish oson — Redis'dan o'chirasiz. Access token muddati o'tguncha (15 min) ishlayveradi — bu acceptable kompromiss.

JWT struktura: ┌─────────────────────────────────────────────────┐ │ Header . Payload . Signature │ ├─────────────────────────────────────────────────┤ │ {alg: RS256} │ {sub, exp, ...} │ HMAC/RSA(key) │ └─────────────────────────────────────────────────┘ eyJhbGciOi... . eyJzdWIiOi... . SVX3dGQq... Decode qilish — base64 qaytarish (secret'siz!) Verify qilish — signature tekshirish (secret kerak)
jwt_hybrid.py
"""Production-grade hybrid: short JWT + long refresh token"""
import jwt
from datetime import datetime, timedelta, timezone

ACCESS_TOKEN_TTL = timedelta(minutes=15)
REFRESH_TOKEN_TTL = timedelta(days=30)

class TokenService:
    def __init__(self, secret: str, algorithm: str = "HS256"):
        self.secret = secret
        self.algorithm = algorithm
    
    def create_access(self, user: User) -> str:
        now = datetime.now(timezone.utc)
        payload = {
            "sub": str(user.id),
            "tenant": str(user.tenant_id),
            "roles": user.roles,
            "iat": now,
            "exp": now + ACCESS_TOKEN_TTL,
            "type": "access",
        }
        return jwt.encode(payload, self.secret, algorithm=self.algorithm)
    
    async def create_refresh(self, user: User) -> str:
        token = secrets.token_urlsafe(32)
        # Redis'da saqlaymiz — revoke qilish uchun
        await redis.setex(
            f"refresh:{token}",
            REFRESH_TOKEN_TTL.total_seconds(),
            json.dumps({"user_id": str(user.id), "created_at": time.time()})
        )
        return token
    
    def verify_access(self, token: str) -> dict:
        try:
            payload = jwt.decode(token, self.secret, algorithms=[self.algorithm])
            if payload.get("type") != "access":
                raise HTTPException(401, "Invalid token type")
            return payload
        except jwt.ExpiredSignatureError:
            raise HTTPException(401, "Token expired")
        except jwt.InvalidTokenError:
            raise HTTPException(401, "Invalid token")
    
    async def refresh(self, refresh_token: str) -> dict:
        data = await redis.get(f"refresh:{refresh_token}")
        if not data:
            raise HTTPException(401, "Invalid refresh token")
        
        info = json.loads(data)
        user = await get_user(info["user_id"])
        
        # Rotation — eski refresh tokenni o'chirib, yangisini beramiz
        await redis.delete(f"refresh:{refresh_token}")
        
        return {
            "access_token": self.create_access(user),
            "refresh_token": await self.create_refresh(user),
        }

OAuth 2.0 / OIDC — third-party login

Nima uchun OAuth kerak? — Asosiy muammo

Tasavvur qiling: siz Spotify'ga kirasiz, u Google kontaktlaringizni ko'rmoqchi. Eng sodda yechim — Spotify'ga Google parolingizni bering. Lekin bu dahshatli xavfsizlik xatosi: Spotify sizning Gmail, Drive, YouTube — hammasiga kirish huquqiga ega bo'lib qoladi. Agar Spotify hack qilinsa, Google akkauntingiz ham ketadi.

OAuth buni hal qiladi: siz Google'ga to'g'ridan-to'g'ri kirasiz, Google esa Spotify'ga cheklangan ruxsat (token) beradi — faqat so'ralgan permissions uchun. Spotify sizning parolingizni hech qachon ko'rmaydi.

Authorization Code Flow — server-side web app uchun eng xavfsiz flow. 8 qadam, ikki xil kanal ishlatiladi:

USER Browser YOUR APP Backend Server GOOGLE OAuth Provider 1. "Login with Google" bosildi 2. redirect: /authorize?scope&state&redirect_uri browser 302 redirect 3. Google login sahifasi ko'rsatildi 4. Email+parol → "Allow" bosildi 5. redirect: /callback?code=AUTH_CODE bir martalik kod · 60 sek · browser orqali 6. POST /token: {code + client_secret} SERVER-TO-SERVER · maxfiy kanal · HTTPS 7. {access_token, id_token(JWT)} id_token: sub, email, name, picture 8. Session cookie berildi. Kirdi! Xavfsizlik: 1-5 qadamlar browser orqali (ko'rinadi) · 6-7 server-to-server (maxfiy) client_secret faqat server environment'da · hech qachon frontend kodida bo'lmasin! Mobile/SPA: client_secret yo'q → PKCE (code_challenge + code_verifier) ishlatiladi
Nega AUTH_CODE → Token exchange kerak? (5→6→7)

Step 5'da Google AUTH_CODE beradi — bir martalik, 60 sekundlik vaqtinchalik kalit. Nega to'g'ridan-to'g'ri token bermaydi? Chunki step 5 browser orqali o'tadi — URL'da ko'rinadi, browser history'da qoladi. Agar hacker URL'ni ushlasa, faqat bekor token oladi. Ammo step 6'da sizning serveringiz client_secret bilan AUTH_CODE'ni almashtiradi — bu kanal faqat sizning server va Google o'rtasida, brauzer ko'rmaydi.

oauth_callback.py
from authlib.integrations.starlette_client import OAuth

oauth = OAuth()
oauth.register(
    name='google',
    client_id=os.getenv('GOOGLE_CLIENT_ID'),
    client_secret=os.getenv('GOOGLE_CLIENT_SECRET'),
    server_metadata_url='https://accounts.google.com/.well-known/openid-configuration',
    client_kwargs={'scope': 'openid email profile'}
)

@app.get('/auth/google')
async def google_login(request: Request):
    redirect_uri = request.url_for('google_callback')
    return await oauth.google.authorize_redirect(request, redirect_uri)

@app.get('/auth/google/callback')
async def google_callback(request: Request):
    token = await oauth.google.authorize_access_token(request)
    user_info = token.get('userinfo') or await oauth.google.parse_id_token(request, token)
    
    # Find or create user
    user = await find_or_create_user(
        email=user_info['email'],
        name=user_info.get('name'),
        google_sub=user_info['sub'],
    )
    
    # Issue your own session/tokens
    return issue_tokens(user)

Passkeys — 2026 yangi standart

Password'larni o'rnini bosuvchi yangi standart. WebAuthn + FIDO2 asosida. User'ning device'ida maxfiy kalit (Touch ID, Face ID, Windows Hello bilan himoyalangan), server'da public key.

Nima uchun passkeys?

Apple, Google, Microsoft 2024-dan boshlab default qilishdi. 2026-da hamma major SaaS'da mavjud. Phishing-resistant (domain'ga bog'langan), password yo'q (leak qila olmaysiz), user uchun oddiy (Face ID).

passkey_register.py
from webauthn import generate_registration_options, verify_registration_response
from webauthn.helpers.structs import AuthenticatorSelectionCriteria, UserVerificationRequirement

RP_ID = "myapp.com"
RP_NAME = "My App"

@app.post("/webauthn/register/begin")
async def register_begin(user: User = Depends(get_current_user)):
    options = generate_registration_options(
        rp_id=RP_ID,
        rp_name=RP_NAME,
        user_id=str(user.id).encode(),
        user_name=user.email,
        user_display_name=user.name,
        authenticator_selection=AuthenticatorSelectionCriteria(
            user_verification=UserVerificationRequirement.REQUIRED,
        ),
    )
    # Challenge'ni session'da saqlaymiz
    await redis.setex(f"webauthn:challenge:{user.id}", 300, options.challenge)
    return options

@app.post("/webauthn/register/complete")
async def register_complete(credential: dict, user: User = Depends(get_current_user)):
    expected_challenge = await redis.get(f"webauthn:challenge:{user.id}")
    verification = verify_registration_response(
        credential=credential,
        expected_challenge=expected_challenge,
        expected_origin=f"https://{RP_ID}",
        expected_rp_id=RP_ID,
    )
    # Public key'ni DB'ga saqlaymiz
    await db.save_credential(
        user_id=user.id,
        credential_id=verification.credential_id,
        public_key=verification.credential_public_key,
        sign_count=verification.sign_count,
    )
    return {"ok": True}

Tanlash guide

Auth methodUse caseProsCons
Session + cookieServer-rendered apps (Django, Rails)Logout instant, XSS saferStateful, scaling
JWT (hybrid)SPA, mobile, microservicesStateless, scalableLogout qiyin
OAuth2/OIDCThird-party loginUX yaxshi, password yo'qDependency on provider
PasskeysPhishing-resistantEng xavfsiz, UX zo'rDevice bog'liqligi
API keysServer-to-serverSoddaRotation kerak
mTLSInternal servicesEng xavfsizCertificate management
2.2

authorization — rbac, abac, rebac, policy engines

RBAC, ABAC, ReBAC — qaysi birini qachon tanlash va qanday joriy qilish?
javob

RBAC (Role-Based Access Control)

Eng klassik yondashuv. Rolepermissionsusers.

rbac_schema.sql
CREATE TABLE roles (
    id UUID PRIMARY KEY,
    tenant_id UUID NOT NULL,
    name TEXT NOT NULL,
    UNIQUE (tenant_id, name)
);

CREATE TABLE permissions (
    id UUID PRIMARY KEY,
    resource TEXT NOT NULL,    -- 'orders', 'reports'
    action TEXT NOT NULL,      -- 'read', 'write', 'delete'
    UNIQUE (resource, action)
);

CREATE TABLE role_permissions (
    role_id UUID REFERENCES roles(id) ON DELETE CASCADE,
    permission_id UUID REFERENCES permissions(id) ON DELETE CASCADE,
    PRIMARY KEY (role_id, permission_id)
);

CREATE TABLE user_roles (
    user_id UUID REFERENCES users(id) ON DELETE CASCADE,
    role_id UUID REFERENCES roles(id) ON DELETE CASCADE,
    PRIMARY KEY (user_id, role_id)
);

ABAC (Attribute-Based)

Atribut'lar asosida qaror: user atributlari + resource atributlari + context. Murakkab qoidalar uchun.

abac.py
def can_access_report(user, report, context):
    """Report ko'rish qoidalari — murakkab"""
    # Admin hamma narsani
    if "admin" in user.roles:
        return True
    
    # Own department
    if user.department_id == report.department_id:
        return True
    
    # Manager — qo'l ostidagilar report'lari
    if user.role == "manager" and report.author_id in user.subordinate_ids:
        return True
    
    # Finance team — faqat ish vaqti
    if "finance" in user.roles and report.type == "financial":
        now = context["now"]
        if 9 <= now.hour < 18 and now.weekday() < 5:
            return True
    
    # Region restriction
    if report.classification == "confidential":
        if user.region != report.region:
            return False
    
    return False

ReBAC (Relationship-Based) — Google Zanzibar style

Google Drive, GitHub, Notion — bu usulni ishlatadi. "User X resource Y ga relation R ga ega" deb ifodalanadi.

Misol relationship'lar: user:alice → owner → document:report_2026 user:bob → editor → document:report_2026 group:finance → viewer → document:report_2026 user:carol → member → group:finance Query: "Carol report_2026 ni ko'ra oladimi?" Graph: carol → member → finance → viewer → report_2026 ✓
openfga.py
# OpenFGA — Zanzibar'ning open-source versiyasi
from openfga_sdk import OpenFgaClient

fga = OpenFgaClient(...)

# Relationship yaratish
await fga.write(tuples=[{
    "user": "user:alice",
    "relation": "owner",
    "object": "document:report_2026"
}, {
    "user": "group:finance#member",
    "relation": "viewer",
    "object": "document:report_2026"
}])

# Permission check
result = await fga.check({
    "user": "user:carol",
    "relation": "can_view",
    "object": "document:report_2026"
})
# result.allowed = True (carol ∈ finance, finance ∋ viewer)

Policy Engines — Open Policy Agent (OPA)

Kod emas, policy as code. Authorization qoidalari deklarativ tilda (Rego):

policy.rego
package app.authz

default allow = false

# Admin hamma narsa
allow {
    input.user.roles[_] == "admin"
}

# Owner o'zining resursi
allow {
    input.resource.owner_id == input.user.id
}

# Reader faqat publish'lar
allow {
    input.action == "read"
    input.resource.status == "published"
    input.user.roles[_] == "reader"
}

# Workday restriction finance uchun
allow {
    input.user.department == "finance"
    input.resource.type == "financial_report"
    workday
    time.clock(time.now_ns())[0] >= 9
    time.clock(time.now_ns())[0] < 18
}

workday {
    day := time.weekday(time.now_ns())
    day != "Saturday"
    day != "Sunday"
}

Qachon nimani tanlash?

RBAC tanlang agar
  • Sodda ierarxiya: admin/manager/user
  • Permission'lar kam o'zgaradi
  • Tezroq implement qilish kerak
  • Startup yoki oddiy SaaS
RBAC yetmaydi agar
  • Owner/editor/viewer per-resource
  • Vaqt/joyga bog'liq qoidalar
  • Cross-tenant sharing (Google Drive)
  • Compliance (SOC2, GDPR) talab qiladi
Zero Trust — default standard

"Trust nothing, verify everything". Har request'da: authenticate → authorize → audit. Internal service'lar ham mTLS bilan. SPIFFE/SPIRE, Istio service mesh yordamida. Keyingi bo'limlarda ko'ramiz.

03

api design — all protocols

3.1

rest api — resource design, versioning, hateoas

Zamonaviy REST API qanday tuzilishi kerak? Versioning, pagination, error handling.
javob

Resource modeling — URL strukturasi

Good REST URLs (nouns, hierarchical): ✓ GET /api/v1/orders ✓ GET /api/v1/orders/abc-123 ✓ POST /api/v1/orders ✓ PATCH /api/v1/orders/abc-123 ✓ DELETE /api/v1/orders/abc-123 ✓ GET /api/v1/users/u-456/orders (sub-resource) Bad URLs (verbs, actions in URL): ✗ GET /api/getAllOrders ✗ POST /api/createOrder ✗ POST /api/orders/abc-123/update ✗ GET /api/order?id=abc-123 Actions that don't fit CRUD: → POST /api/v1/orders/abc-123/cancel (resource action) → POST /api/v1/orders/abc-123:refund (Google AIP style)

Versioning — 3 ta yondashuv

StrategyExampleProsCons
URL path/api/v2/ordersAniq, cache-friendlyBreaking change visible
HeaderAccept: application/vnd.app.v2+jsonClean URLCaching qiyin
Query param?version=2OsonAnti-pattern

Tavsiya: URL path versioning — 95% holatlarda eng oddiy va eng aniq.

Pagination — 3 ta texnika

1. Offset pagination (eng oddiy, lekin xavfli)
offset.py
@app.get("/orders")
async def list_orders(page: int = 1, per_page: int = 20):
    offset = (page - 1) * per_page
    orders = await db.fetch_all(
        "SELECT * FROM orders WHERE tenant_id = $1 "
        "ORDER BY created_at DESC LIMIT $2 OFFSET $3",
        tenant_id, per_page, offset
    )
    total = await db.fetch_val("SELECT COUNT(*) FROM orders WHERE tenant_id = $1", tenant_id)
    return {
        "data": orders,
        "meta": {"page": page, "per_page": per_page, "total": total}
    }

Muammo: OFFSET 100000 — PostgreSQL 100k qatorni o'qib, tashlab yuboradi. Sekin. Shuningdek, data o'zgarsa, user ba'zi elementlarni ikki marta ko'rishi mumkin.

2. Cursor pagination (production standard)
cursor.py
import base64

def encode_cursor(value: dict) -> str:
    return base64.urlsafe_b64encode(json.dumps(value).encode()).decode()

def decode_cursor(cursor: str) -> dict:
    return json.loads(base64.urlsafe_b64decode(cursor.encode()))

@app.get("/orders")
async def list_orders(cursor: str | None = None, limit: int = 20):
    limit = min(limit, 100)
    
    if cursor:
        c = decode_cursor(cursor)
        rows = await db.fetch_all("""
            SELECT * FROM orders 
            WHERE tenant_id = $1 AND (created_at, id) < ($2, $3)
            ORDER BY created_at DESC, id DESC
            LIMIT $4
        """, tenant_id, c["created_at"], c["id"], limit + 1)
    else:
        rows = await db.fetch_all("""
            SELECT * FROM orders WHERE tenant_id = $1
            ORDER BY created_at DESC, id DESC LIMIT $2
        """, tenant_id, limit + 1)
    
    has_more = len(rows) > limit
    rows = rows[:limit]
    
    next_cursor = None
    if has_more and rows:
        last = rows[-1]
        next_cursor = encode_cursor({"created_at": last.created_at.isoformat(), "id": str(last.id)})
    
    return {"data": rows, "next_cursor": next_cursor, "has_more": has_more}
Cursor pagination foydalari

O(log n) har query · Data o'zgarishga chidamli · Infinite scroll uchun ideal · Twitter, Stripe, Shopify — hammasi cursor. Composite cursor (created_at + id) — tie-break uchun.

Error handling — RFC 7807 Problem Details

errors.py
from fastapi import HTTPException
from fastapi.responses import JSONResponse

class APIError(HTTPException):
    def __init__(self, status: int, type_: str, title: str, detail: str = "", **extensions):
        self.type = type_
        self.title = title
        self.detail = detail
        self.extensions = extensions
        super().__init__(status_code=status, detail=detail)

@app.exception_handler(APIError)
async def api_error_handler(request, exc: APIError):
    return JSONResponse(
        status_code=exc.status_code,
        content={
            "type": exc.type,
            "title": exc.title,
            "status": exc.status_code,
            "detail": exc.detail,
            "instance": str(request.url),
            **exc.extensions,
        },
        media_type="application/problem+json"
    )

# Ishlatish
raise APIError(
    status=422,
    type_="https://api.myapp.com/errors/validation",
    title="Validation failed",
    detail="amount must be positive",
    field="amount",
    received=-100,
)
response body
{
  "type": "https://api.myapp.com/errors/validation",
  "title": "Validation failed",
  "status": 422,
  "detail": "amount must be positive",
  "instance": "https://api.myapp.com/orders",
  "field": "amount",
  "received": -100
}

HTTP status — to'g'ri tanlash

StatusQachonMisol
200 OKMuvaffaqiyatli GET/PATCHOrder ma'lumoti qaytadi
201 CreatedPOST yangi resource yaratdiYangi order yaratildi
202 AcceptedAsync operation boshlandiReport generation queue'da
204 No ContentDELETE muvaffaqiyatliOrder o'chirildi
400 Bad RequestMijoz xato yubordiJSON parse fail
401 UnauthorizedAuth yo'q yoki noto'g'riToken expired
403 ForbiddenAuth bor, lekin ruxsat yo'qBoshqaning order'i
404 Not FoundResource mavjud emasOrder topilmadi
409 ConflictState conflictDuplicate email signup
422 UnprocessableValidation failedPydantic error
429 Too Many RequestsRate limitAPI limit exceeded
500 Server ErrorBizning bugUnhandled exception
502 Bad GatewayUpstream failExternal API down
503 UnavailableVaqtinchalik tushganDeployment ongoing
3.2

graphql vs grpc vs trpc — qachon qaysi?

REST'dan tashqari qanday protokollar bor va qachon ishlatiladi?
javob

GraphQL — flexible query language

Oddiy tushuntirish

REST: server qaror qiladi "nima qaytarish kerak". GraphQL: client qaror qiladi "menga aynan bular kerak". Client underfetching/overfetching muammosini hal qiladi.

graphql query
query GetUserDashboard($userId: ID!) {
  user(id: $userId) {
    name
    email
    orders(limit: 5, status: ACTIVE) {
      id
      total
      items {
        product { name, price }
        quantity
      }
    }
    notifications(unread: true) {
      id
      message
    }
  }
}

Bitta request → butun dashboard data. REST'da 4-5 ta request kerak bo'lardi.

GraphQL +
  • Client aynan kerakli data oladi
  • Versioning kerak emas (schema evolution)
  • Strong typing (schema)
  • Tool'lar zo'r (Apollo, Relay)
GraphQL −
  • N+1 muammosi (DataLoader kerak)
  • Caching qiyin
  • File upload noqulay
  • Learning curve
  • Query complexity security

gRPC — high-performance RPC

Google yaratgan. HTTP/2 + Protocol Buffers. 2-10x tezroq REST'dan. Microservice-to-microservice uchun ideal.

order.proto
syntax = "proto3";

package orders.v1;

service OrderService {
  rpc CreateOrder(CreateOrderRequest) returns (Order);
  rpc GetOrder(GetOrderRequest) returns (Order);
  rpc ListOrders(ListOrdersRequest) returns (ListOrdersResponse);
  rpc StreamOrderUpdates(StreamRequest) returns (stream OrderUpdate);
}

message Order {
  string id = 1;
  string tenant_id = 2;
  string user_id = 3;
  repeated OrderItem items = 4;
  double total = 5;
  OrderStatus status = 6;
  google.protobuf.Timestamp created_at = 7;
}

enum OrderStatus {
  PENDING = 0;
  PAID = 1;
  SHIPPED = 2;
  DELIVERED = 3;
  CANCELLED = 4;
}
server.py
import grpc
from orders_pb2_grpc import OrderServiceServicer, add_OrderServiceServicer_to_server
from orders_pb2 import Order, OrderStatus

class OrderServicer(OrderServiceServicer):
    async def CreateOrder(self, request, context):
        order = await db.create_order(request.user_id, request.items)
        return Order(
            id=str(order.id),
            tenant_id=order.tenant_id,
            user_id=order.user_id,
            total=order.total,
            status=OrderStatus.PENDING,
        )
    
    async def StreamOrderUpdates(self, request, context):
        """Server streaming — live updates"""
        async for update in order_update_stream(request.user_id):
            yield OrderUpdate(order_id=update.id, status=update.status)

async def serve():
    server = grpc.aio.server()
    add_OrderServiceServicer_to_server(OrderServicer(), server)
    server.add_insecure_port('[::]:50051')
    await server.start()
    await server.wait_for_termination()

tRPC — TypeScript end-to-end type safety

Agar sizning stack'ingiz Node.js backend + React/Next.js frontend bo'lsa — tRPC revolyutsion. Backend type'lari avtomatik frontend'ga keladi.

trpc_server.ts
import { initTRPC } from '@trpc/server';
import { z } from 'zod';

const t = initTRPC.create();

export const appRouter = t.router({
  getOrder: t.procedure
    .input(z.object({ id: z.string() }))
    .query(async ({ input }) => {
      return await db.order.findUnique({ where: { id: input.id } });
    }),
  
  createOrder: t.procedure
    .input(z.object({
      userId: z.string(),
      items: z.array(z.object({ productId: z.string(), qty: z.number() }))
    }))
    .mutation(async ({ input }) => {
      return await db.order.create({ data: input });
    }),
});

export type AppRouter = typeof appRouter;
trpc_client.tsx
// Client — backend type'larini to'liq biladi
const { data, isLoading } = trpc.getOrder.useQuery({ id: "abc-123" });
//      ^^^^ type: Order | undefined, fully typed

const createOrder = trpc.createOrder.useMutation();
createOrder.mutate({ userId: "u-1", items: [{ productId: "p-1", qty: 2 }] });
// If you pass wrong type — TypeScript error at compile time

Tanlash matritsasi

ProtocolQachon tanlashPerformanceDX
RESTPublic API, klassik CRUDYaxshiYaxshi
GraphQLMobile + web, complex UIO'rtachaZo'r
gRPCMicroservice-to-microserviceEng yaxshiO'rtacha
tRPCTS mono-repo (Next.js)YaxshiEng yaxshi
WebSocketReal-time bidirectionalYaxshiO'rtacha
SSEServer → client streamYaxshiYaxshi
Hybrid approach — tavsiya

Praktikada: REST public API uchun, gRPC internal service'lar o'rtasida, WebSocket real-time uchun, GraphQL mobile clients uchun (agar ish hajmi kattasini haqlasa). Bitta arxitektura bir nechta protokolni birga ishlatishi normal.

3.3

http request-to-response — to'liq hayot sikli

Browser "Enter" bosgandan to javob kelguncha nima bo'ladi? Har bir qadam qanday ishlaydi?
javob
Nima uchun buni bilish kerak?

Ko'pchilik developer "request yuboraman, response keladi" deb o'ylaydi. Lekin bu orada 15+ ta qatlam va qaror qabul qilish nuqtasi bor. Performance muammosi qayerda? Xavfsizlik tekshiruvi qayerda? Caching qayerda yutadi? Load balancer qanday qaror qiladi? Bularni bilmay tizim optimize qilish — ko'r bo'lib ot minish.

BROWSER/CLIENT GET /api/orders HTTP/2 DNS Resolver api.myapp.com → IP TLS Handshake Certificate verify CDN Edge Cache hit? → 200ms Load Balancer Round-robin / health WAF + Rate Limiter DDoS, injection block App Server FastAPI / Django Middleware Stack 1. Auth/JWT verify 2. Request logging 3. Tenant routing Cache Check Redis hit? → skip DB PostgreSQL Query + Index scan External Services Payment, Email, AI HTTP Response 200 OK + JSON body 1. DNS lookup 2. TCP+TLS 3. CDN check MISS → LB 4. WAF/RateLimit 5. App server 6. Cache lookup MISS → DB 7. Response Timing (production p50): DNS: 1-5ms (cached) TCP+TLS: 30-60ms (1st) CDN hit: 5-20ms ← ideal LB routing: 1-2ms WAF check: 1-3ms Middleware: 1-5ms Redis hit: 1-4ms ← fast DB query: 5-50ms Serialize: 1-3ms Total p50: 50-120ms p99 target: <500ms

Har qatlamda nima bo'ladi — chuqur tahlil

1. DNS — Telefon kitobi

api.myapp.com → brauzer avval local cache'dan qidiradi (msec), keyin OS DNS cache, keyin ISP DNS server (50-100ms), nihoyat root DNS (nadir). DNS TTL muhim: 300 sekund = har 5 minutda lookup. CDN ishlatilsa, DNS Anycast texnikasi bilan geografik eng yaqin serverga yo'naltiriladi. DNS propagation — domain o'zgartirganda 24-48 soat olishi mumkin, shuning uchun deploy oldidan TTL'ni pasaytirish kerak.

2. TLS Handshake — Maxfiy kalit kelishuvi

Birinchi ulanishda: (1) Client "Hello" — qanday encryption ishlata olishini aytadi, (2) Server sertifikat yuboradi, (3) Client sertifikatni CA (Certificate Authority) orqali tekshiradi, (4) Session key kelishiladi. Bu ~1-2 round trip = 60-120ms. Keyingi ulanishlarda TLS Session Resumption bu xarajatni 0'ga tushiradi. HTTP/2 multiplexing bilan bitta TLS session'da parallel requestlar.

3. Load Balancer — Trafikni taqsimlash

LB (Nginx, HAProxy, AWS ALB) so'rovni qaysi server olishi kerakligini hal qiladi. Algoritmlar: Round-robin (navbat bilan), Least connections (kam band serverga), IP hash (bir client har doim bir serverga — session affinity), Weighted (kuchli serverga ko'proq). LB sog'liq tekshiruvlari: har 5 sekundda GET /health — javob bermagan server ishdan chiqariladi.

4. Middleware — "Darvozon qo'riqchilari"

Request app handler'ga yetishidan oldin bir qator "filtr"lardan o'tadi. Har filtr o'z qarorini qiladi — davom ettirish yoki to'xtatish. FastAPI'da: Auth middleware (JWT tekshirish), CORS (cross-origin headers), Request ID (tracing uchun UUID), Rate limit, Body size limit, Timeout. Bu chain'da kech topilgan xato — masalan, auth'dan o'tib, rate limit ham o'tib, keyin DB'da yozuv topilmasa — ko'proq resurs sarflangan. Shuning uchun tekshiruvlar tartib muhim.

middleware_chain.py
from fastapi import FastAPI, Request, Response
from fastapi.middleware.cors import CORSMiddleware
import time, uuid, logging

app = FastAPI()

# CORS — eng avval
app.add_middleware(CORSMiddleware,
    allow_origins=["https://app.mysite.com"],
    allow_methods=["GET", "POST", "PUT", "PATCH", "DELETE"],
    allow_headers=["Authorization", "Content-Type"],
    max_age=3600,  # preflight cache
)

@app.middleware("http")
async def request_lifecycle(request: Request, call_next):
    """Request-to-response hayot siklini boshqarish"""
    # ── 1. REQUEST ID (tracing) ──
    request_id = request.headers.get("X-Request-ID", str(uuid.uuid4()))
    request.state.request_id = request_id

    # ── 2. START TIMING ──
    start = time.perf_counter()

    # ── 3. STRUCTURED LOGGING ──
    logger = logging.getLogger("request")
    logger.info({
        "event": "request_start",
        "request_id": request_id,
        "method": request.method,
        "path": request.url.path,
        "ip": request.client.host,
    })

    try:
        # ── 4. PROCESS (next middleware / handler) ──
        response: Response = await call_next(request)

        # ── 5. ADD RESPONSE HEADERS ──
        duration_ms = (time.perf_counter() - start) * 1000
        response.headers["X-Request-ID"] = request_id
        response.headers["X-Response-Time"] = f"{duration_ms:.2f}ms"
        response.headers["X-Content-Type-Options"] = "nosniff"
        response.headers["X-Frame-Options"] = "DENY"

        # ── 6. LOG COMPLETION ──
        logger.info({
            "event": "request_end",
            "request_id": request_id,
            "status": response.status_code,
            "duration_ms": round(duration_ms, 2),
        })

        # ── 7. SLOW REQUEST ALERT ──
        if duration_ms > 1000:
            logger.warning({
                "event": "slow_request",
                "request_id": request_id,
                "path": request.url.path,
                "duration_ms": round(duration_ms, 2),
            })

        return response

    except Exception as exc:
        duration_ms = (time.perf_counter() - start) * 1000
        logger.error({
            "event": "request_error",
            "request_id": request_id,
            "error": str(exc),
            "duration_ms": round(duration_ms, 2),
        })
        raise

HTTP Status Codes — professional guide

CodeNomiQachon ishlatishXato qilinadi
200OKGET, PUT, PATCH muvaffaqiyatli
201CreatedPOST — yangi resurs yaratildi, Location header qo'shPOST'ni ham 200 qaytarish
204No ContentDELETE muvaffaqiyatli, body yo'q
400Bad RequestValidation xato (schema, format)Server xatosini 400 deyish
401UnauthorizedToken yo'q yoki invalid403 o'rniga 401 ishlatish
403ForbiddenAuthenticated, lekin ruxsat yo'qResursni "topilmadi" deb yashirish (401 o'rniga)
404Not FoundResurs mavjud emasHar xatoni 404 deyish
409ConflictDuplicate entry, concurrent modification
422UnprocessableFastAPI default — Pydantic validation xato
429Too Many RequestsRate limit, Retry-After header qo'sh
500Internal ErrorServer xatosi — stack trace foydalanuvchiga CHIQMASinStack trace expose qilish
502/503Bad/UnavailUpstream service down yoki overload

RFC 9457 Problem Details — professional error format

Nima uchun standart error format?

Har developer o'z formatida error qaytarsa — frontend har endpoint uchun alohida error handling yozishi kerak. RFC 9457 (avval RFC 7807) — HTTP API'larda error uchun standart JSON format. type machine-readable URI, title human-readable, detail — aniq ma'lumot. FastAPI bu formatni qo'llab-quvvatlaydi.

error_handling.py
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse
from pydantic import BaseModel

# RFC 9457 — Problem Details format
class ProblemDetail(BaseModel):
    type: str            # URI — mashinaga o'qiladi
    title: str           # Qisqa xato nomi
    status: int          # HTTP status code
    detail: str          # Aniq, foydali ma'lumot
    instance: str = ""   # Qaysi URL'da bo'ldi

# Custom exception
class AppError(Exception):
    def __init__(self, type: str, title: str, detail: str, status: int = 400):
        self.type = type
        self.title = title
        self.detail = detail
        self.status = status

app = FastAPI()

@app.exception_handler(AppError)
async def app_error_handler(request: Request, exc: AppError):
    return JSONResponse(
        status_code=exc.status,
        content={
            "type": f"https://docs.myapp.com/errors/{exc.type}",
            "title": exc.title,
            "status": exc.status,
            "detail": exc.detail,
            "instance": str(request.url),
        },
        headers={"Content-Type": "application/problem+json"}
    )

# Ishlatish:
@app.post("/orders")
async def create_order(data: OrderCreate):
    if data.amount <= 0:
        raise AppError(
            type="invalid-amount",
            title="Invalid Order Amount",
            detail=f"Amount must be positive, got {data.amount}",
            status=400
        )
    if not await check_inventory(data.product_id):
        raise AppError(
            type="out-of-stock",
            title="Product Out of Stock",
            detail=f"Product {data.product_id} is currently unavailable",
            status=409
        )
    ...

# Response:
# HTTP 409
# Content-Type: application/problem+json
# {
#   "type": "https://docs.myapp.com/errors/out-of-stock",
#   "title": "Product Out of Stock",
#   "status": 409,
#   "detail": "Product abc-123 is currently unavailable",
#   "instance": "/orders"
# }
3.4

websocket, sse, long polling — real-time protokollar

Real-time ma'lumot uzatish uchun qaysi protokol qachon ishlatiladi?
javob
HTTP polling muammosi — nima uchun yangi protokollar kerak?

Klassik HTTP: client so'rovni yuboradi, server javob qaytaradi — connection yopiladi. Lekin chat, live notification, stock price kabi hollarda server client'ga birinchi bo'lib xabar yuborishi kerak. HTTP polling yechimi: "har 1 sekundda serverga so'rov" — 99% so'rov "yangilik yo'q" javobini oladi, lekin hammasi ham server resursini isrof qiladi.

TexnikaQanday ishlaydiLatencyServer loadQachon
Short PollingHar N sekundda GETN sekundJuda yuqoriIshlatmang
Long PollingServer xabar bo'lguncha kutadi (30-60s)~0msO'rta (connection hold)Fallback uchun
SSEHTTP keep-alive, server push only<50msPastBir tomonlama: log, AI stream
WebSocketBi-directional persistent TCP<10msO'rtaChat, collaboration, gaming
WebRTCP2P, UDP-based<5msJuda past (P2P)Video/voice call

SSE — Server-Sent Events: LLM streaming uchun ideal

SSE nima uchun LLM stream uchun zo'r?

ChatGPT matni "harfma-harf" ko'rsatiladi — bu SSE. LLM token'larni birma-bir generate qiladi. SSE bilan har token kelishi bilan browser'ga push qilinadi. WebSocket kerak emas — chunki foydalanuvchi faqat oladi, yuborishning keragi yo'q (faqat boshlang'ich prompt HTTP orqali keladi). SSE'ning afzalligi: oddiy HTTP, automatic reconnect, browser native.

sse_stream.py
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from anthropic import AsyncAnthropic
import asyncio, json

app = FastAPI()
client = AsyncAnthropic()

@app.post("/api/chat/stream")
async def chat_stream(prompt: str, user_id: str):
    """LLM javobini SSE orqali streaming"""

    async def generate():
        # SSE format: "data: {json}\n\n"
        # Connection o'rnatildi signali
        yield f"data: {json.dumps({'type': 'start'})}\n\n"

        full_text = ""
        try:
            # Claude streaming API
            async with client.messages.stream(
                model="claude-sonnet-4-6",
                max_tokens=1024,
                messages=[{"role": "user", "content": prompt}]
            ) as stream:
                async for text_chunk in stream.text_stream:
                    full_text += text_chunk
                    # Har token'ni darhol yuborish
                    yield f"data: {json.dumps({'type': 'chunk', 'text': text_chunk})}\n\n"

            # Tugash signali
            yield f"data: {json.dumps({'type': 'done', 'total_chars': len(full_text)})}\n\n"

        except Exception as e:
            yield f"data: {json.dumps({'type': 'error', 'message': str(e)})}\n\n"

    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",   # Nginx buffering'ni o'chirish
            "Connection": "keep-alive",
        }
    )

# Frontend (JavaScript) tomonida:
# const es = new EventSource('/api/chat/stream?prompt=...');
# es.onmessage = (e) => {
#   const data = JSON.parse(e.data);
#   if (data.type === 'chunk') appendText(data.text);
#   if (data.type === 'done') es.close();
# };
04

caching & performance

4.1

caching layers — browser, cdn, app, db

Cache qayerda ishlatiladi va har bir qatlam nima qiladi?
javob
Cache nima va nima uchun kerak?

Cache — bu "oldindan javob tayyorlab qo'yish". Tasavvur qiling: qo'shni har kuni sizdan "soat nechada?" deb so'raydi. Har safar telefonga qarab vaqtni ko'rsatish o'rniga, bitta katta soat devorga osilib qo'ysa — minglagan so'rovga bitta javob. Kompyuterda ham xuddi shunday: DB'ga yoki API'ga qayta-qayta borish o'rniga, natijani vaqtinchalik joyda saqlab, keyingi so'rovda shu joydan olinadi.

Cache kuch-quvvati: faqat bitta qatlam emas, 5 ta qatlam bor — har biri turli muammoní hal qiladi. Browser cache Internet traffic'ni tejaydi. CDN geografik masofani qisqartiradi. App memory sub-millisecond javob beradi. Redis distributed state'ni saqlaydi. DB query cache og'ir hisoblashlarni qayta ishlamaydi.

5 qatlamli cache arxitekturasi

L1 Browser Cache Cache-Control, ETag · ~1-5ms access static assets, API responses L2 CDN Edge (Cloudflare, Fastly) static + cacheable API · ~10-50ms geographic distribution L3 Application Memory (in-process) LRU, TTL cache · <1ms access user session, feature flags L4 Distributed Cache (Redis, Memcached) shared across instances · ~1-5ms session, rate limit, hot data L5 Database Query Cache + Materialized Views PostgreSQL buffer, matview · ~5-30ms expensive aggregations Request oqimi: L1 → L2 → L3 → L4 → L5 → DB. Har qatlamda miss bo'lsa, keyingisiga o'tadi.

Cache invalidation — eng qiyin masala

Phil Karlton's law

"There are only two hard things in Computer Science: cache invalidation and naming things." — 1996 yilda aytilgan, hali ham haqiqat.

Pattern 1 — Write-through (predictable)
write_through.py
async def update_user(user_id: str, data: dict):
    # 1. DB yangilanadi
    user = await db.update(user_id, data)
    # 2. Cache yangilanadi bir vaqtda
    await redis.setex(f"user:{user_id}", 3600, json.dumps(user.dict()))
    return user
Pattern 2 — Write-behind (fast, risky)
write_behind.py
async def increment_view_count(post_id: str):
    # Cache'ga tez yozamiz
    await redis.incr(f"views:{post_id}")
    # DB'ga keyingi batch'da yoziladi (har 30 sekundda)

# Background job
async def flush_view_counts():
    while True:
        keys = await redis.keys("views:*")
        if keys:
            async with db.transaction():
                for key in keys:
                    post_id = key.split(":")[1]
                    count = int(await redis.get(key))
                    await db.execute(
                        "UPDATE posts SET view_count = view_count + $1 WHERE id = $2",
                        count, post_id
                    )
                    await redis.delete(key)
        await asyncio.sleep(30)
Pattern 3 — Cache-aside with versioning (flexible)
versioned_cache.py
async def get_product(product_id: str) -> dict:
    # Global version prefix — mass invalidate uchun
    version = await redis.get("cache:products:version") or "1"
    key = f"product:v{version}:{product_id}"
    
    cached = await redis.get(key)
    if cached:
        return json.loads(cached)
    
    product = await db.fetch_product(product_id)
    await redis.setex(key, 3600, json.dumps(product.dict()))
    return product.dict()

async def invalidate_all_products():
    """Bitta product schema o'zgardi → hammasini bekor qilish"""
    await redis.incr("cache:products:version")
    # Eski key'lar TTL bilan tabiiy o'ladi (disk cleanup)

TTL (Time To Live) — to'g'ri tanlash

Data tipiTTLSabab
Stock price1-5 sekundReal-time kerak
User profile5-15 daqiqaKam o'zgaradi
Product catalog1 soatAdmin vaqti-vaqtida o'zgartiradi
Country list1 kunDeyarli o'zgarmaydi
Feature flags30 sekundToggle darhol ishlasin
LLM responses1 soatToken cost tejash

Cache stampede — klassik muammo

Popular key TTL tugadi → 1000 parallel request DB'ga uriladi → DB crash.

stampede_protection.py
async def get_with_stampede_protection(key: str, fetch_fn, ttl: int = 300):
    cached = await redis.get(key)
    if cached:
        return json.loads(cached)
    
    # Distributed lock — faqat bitta request DB'ga boradi
    lock_key = f"lock:{key}"
    acquired = await redis.set(lock_key, "1", nx=True, ex=10)
    
    if not acquired:
        # Boshqa request DB'dan olmoqda — kutamiz
        for _ in range(20):
            await asyncio.sleep(0.1)
            cached = await redis.get(key)
            if cached:
                return json.loads(cached)
        # 2 sek kutdik, hali yo'q — DB'ga o'zimiz boramiz
    
    try:
        value = await fetch_fn()
        await redis.setex(key, ttl, json.dumps(value))
        return value
    finally:
        await redis.delete(lock_key)
Part II

data & persistence layer

Database arxitekturasi, async sistemalar, event-driven patterns. Data — backend'ning yuragi. Bu bo'limda klassik PostgreSQL'dan vector database'lar va event sourcing'gacha.

05

database architecture — indexes, sharding, vectors

5.1

postgresql indexing mastery

Har index turi qachon kerak? Qanday to'g'ri tanlash va qanday ishlatish?
javob
Index — kitob mundarijasi analogiyasi

Index — kitobning mundarijasi. Mundarijasiz "PostgreSQL" so'zini topish uchun 800 sahifani boshdan oxir ko'rib chiqish kerak (Sequential Scan — O(n)). Mundarija bilan darhol "281-bet" topiladi (Index Scan — O(log n)). Lekin mundarija ham bepul emas: joy oladi, har yangi sahifa qo'shilganda yoki o'zgartirilganda yangilanishi kerak (INSERT/UPDATE sekinlashadi).

Amaliy qoida: Index — read'ni tezlashtiradi, write'ni sekinlashtiradi. Ko'p o'qiladigan, kam yoziladigan ustunlarga index. Ko'p yoziladigan, kam o'qiladigan (audit log) — indexsiz. Keraksiz index — katta xato: write performance'ni yo'qotasiz, disk band o'tadi, query planner'ni chalkashtirasiz.

Index turlari — to'liq katalog

TypeQachonMisol
B-treeDefault. 95% caseEquality, range, sort
HashFaqat equalitySession ID lookup
GINJSONB, array, full-textWHERE tags @> ARRAY['python']
GiSTGeospatial, rangesPostGIS, tstzrange
BRINVery large sorted dataTime-series logs, 5B+ rows
SP-GiSTNon-balanced treesIP ranges, geometric
BloomMulti-column equalityWide table point lookups
HNSW (pgvector)Vector similarityAI embeddings (keyingi bo'lim!)

B-tree — composite va partial

btree_advanced.sql
-- Composite — LEFT-MOST RULE muhim
CREATE INDEX idx_orders_tenant_user_date 
  ON orders (tenant_id, user_id, created_at DESC);

-- Ishlaydi:
SELECT * FROM orders WHERE tenant_id = ? AND user_id = ? ORDER BY created_at DESC;
SELECT * FROM orders WHERE tenant_id = ? AND user_id = ?;
SELECT * FROM orders WHERE tenant_id = ?;

-- Ishlamaydi (index skip):
SELECT * FROM orders WHERE user_id = ?;              -- tenant_id yo'q
SELECT * FROM orders WHERE created_at > ?;           -- chap 2 yo'q


-- Partial index — faqat kerakli subsetga
CREATE INDEX idx_orders_active 
  ON orders (tenant_id, created_at DESC)
  WHERE status IN ('pending', 'processing');
-- 90% order'lar "delivered" bo'lsa, index 10x kichikroq


-- Covering index (INCLUDE) — Index Only Scan
CREATE INDEX idx_users_email_covering 
  ON users (email) 
  INCLUDE (name, avatar_url);
-- SELECT name, avatar_url FROM users WHERE email = '...' 
-- → index'dan to'g'ridan to'g'ri, heap'ga bormaydi


-- Expression index
CREATE INDEX idx_users_lower_email ON users (lower(email));
-- SELECT * FROM users WHERE lower(email) = '...' uchun


-- UNIQUE partial — soft delete uchun
CREATE UNIQUE INDEX idx_users_email_active 
  ON users (email) 
  WHERE deleted_at IS NULL;
-- Soft-deleted user'ning email'ini qayta ishlatish mumkin

GIN — JSONB va array uchun kuch

gin_indexes.sql
-- JSONB — flexible schema
CREATE TABLE events (
    id UUID PRIMARY KEY,
    tenant_id UUID NOT NULL,
    event_type TEXT,
    properties JSONB
);

-- GIN — har JSONB path'ga index
CREATE INDEX idx_events_properties ON events USING GIN (properties);

-- Tez query'lar:
SELECT * FROM events WHERE properties @> '{"user_type": "premium"}';
SELECT * FROM events WHERE properties ? 'campaign_id';
SELECT * FROM events WHERE properties -> 'metadata' ->> 'source' = 'google';

-- JSONB path ops class (tez va kichik, lekin kam ops)
CREATE INDEX idx_events_props_path ON events USING GIN (properties jsonb_path_ops);


-- Array
CREATE TABLE articles (id SERIAL, title TEXT, tags TEXT[]);
CREATE INDEX idx_articles_tags ON articles USING GIN (tags);

SELECT * FROM articles WHERE tags @> ARRAY['python', 'web'];   -- hamma
SELECT * FROM articles WHERE tags && ARRAY['python', 'rust'];  -- kamida 1


-- Full-text search
ALTER TABLE articles ADD COLUMN search_vector tsvector
  GENERATED ALWAYS AS (
    setweight(to_tsvector('english', coalesce(title, '')), 'A') ||
    setweight(to_tsvector('english', coalesce(body, '')), 'B')
  ) STORED;

CREATE INDEX idx_articles_search ON articles USING GIN (search_vector);

SELECT *, ts_rank(search_vector, query) AS rank
FROM articles, plainto_tsquery('english', 'backend architecture') query
WHERE search_vector @@ query
ORDER BY rank DESC;

BRIN — massive time-series

5 milliard qator, sorted by time? B-tree 200 GB. BRIN 600 MB. Faqat "block range" saqlaydi (har 128 blok uchun min/max). Time-range query'lar uchun yetarli.

brin.sql
CREATE TABLE metrics (
    id BIGSERIAL,
    tenant_id UUID,
    metric_name TEXT,
    value DOUBLE PRECISION,
    ts TIMESTAMPTZ NOT NULL
);

-- Time asosida BRIN (time-series natural ordering)
CREATE INDEX idx_metrics_ts_brin 
  ON metrics USING BRIN (ts) 
  WITH (pages_per_range = 128);

-- tenant_id composite BRIN
CREATE INDEX idx_metrics_tenant_ts_brin 
  ON metrics USING BRIN (tenant_id, ts);

Index qachon ZARARLI?

Over-indexing xavf

Har index:
· Disk joy oladi (~10-30% table size)
· Write sekinlashtiradi (har INSERT/UPDATE har indexni yangilaydi)
· Memory/cache ishlatadi
· VACUUM vaqtini uzaytiradi

10+ index bo'lgan jadval — red flag. Auditing kerak.

Unused index'larni topish
unused.sql
-- Ishlatilmayotgan index'lar
SELECT 
    schemaname, tablename, indexname,
    pg_size_pretty(pg_relation_size(indexrelid)) AS size,
    idx_scan
FROM pg_stat_user_indexes
WHERE idx_scan < 50  -- juda kam ishlatilgan
  AND NOT indisunique  -- unique emas (unique'ni tashlab bo'lmaydi)
ORDER BY pg_relation_size(indexrelid) DESC
LIMIT 20;

-- Dublikat index'lar
SELECT indrelid::regclass AS table, 
       array_agg(indexrelid::regclass) AS indexes
FROM pg_index
GROUP BY indrelid, indkey
HAVING count(*) > 1;
5.2

EXPLAIN ANALYZE — query tuning mastery

Sekin query'ni qanday aniqlash va optimallashtirish?
javob

EXPLAIN output'ini o'qish

explain_example.sql
EXPLAIN (ANALYZE, BUFFERS, VERBOSE, FORMAT TEXT)
SELECT o.*, u.name 
FROM orders o
JOIN users u ON u.id = o.user_id
WHERE o.tenant_id = 'xxx'
  AND o.status = 'pending'
ORDER BY o.created_at DESC
LIMIT 20;

/* Output (labels explained):
Limit  (cost=0.85..12.34 rows=20 width=128)
                    ^     ^       ^
         estimate start  total  rows
  (actual time=0.123..2.456 rows=20 loops=1)
                ^          ^
          actual start   actual end
  ->  Nested Loop  (cost=0.85..5234.12 rows=1234 width=128)
        ->  Index Scan using idx_orders_tenant_status on orders o
              (cost=0.43..234.56 rows=1234 width=64)
              (actual time=0.012..0.456 rows=1234 loops=1)
              Index Cond: (tenant_id = 'xxx' AND status = 'pending')
              Buffers: shared hit=45 read=3
                                ^^^    ^^^^
                               cache   disk
        ->  Index Scan using users_pkey on users u
              (cost=0.29..4.05 rows=1 width=64)
              (actual time=0.001..0.001 rows=1 loops=1234)
                                                 ^^^^^^^^^
                                              1234 ta iteration!
              Index Cond: (id = o.user_id)
Planning Time: 0.234 ms
Execution Time: 2.678 ms
*/

Node turlari — nima yomon, nima yaxshi

Node typeNimaYaxshi / Yomon
Index Only ScanIndex'dan to'g'ridan to'g'ri★ Eng tez
Index ScanIndex → heap✓ Yaxshi
Bitmap Heap ScanKo'p qator — bitmap orqali◐ O'rtacha
Seq ScanTo'liq jadvalni o'qish✕ Katta jadvalda yomon
Nested LoopHar qator uchun inner loop◐ Kichik N'da yaxshi
Hash JoinHash table quradi✓ Katta N'da yaxshi
Merge JoinIkkala tomon sortlangan bo'lsa✓ Sort'da tez
SortORDER BY uchun✕ Agar memory'dan oshsa (external)

Real optimization — 12 sek → 80 ms

slow_query.sql
-- BOSHLANG'ICH (12 sekund!)
SELECT 
    u.name, COUNT(o.id) AS orders, SUM(o.amount) AS revenue,
    (SELECT MAX(created_at) FROM orders WHERE user_id = u.id) AS last_order
FROM users u
LEFT JOIN orders o ON o.user_id = u.id
WHERE u.tenant_id = 'xxx'
  AND u.created_at > NOW() - INTERVAL '1 year'
GROUP BY u.id, u.name
ORDER BY revenue DESC NULLS LAST
LIMIT 100;

-- EXPLAIN ANALYZE ko'rsatdi:
-- 1. Seq Scan on users (tenant_id + created_at index yo'q)
-- 2. Correlated subquery MAX(created_at) - har user uchun alohida query!
-- 3. Nested Loop 500k iteration

-- YECHIM:
-- Index
CREATE INDEX idx_users_tenant_created ON users (tenant_id, created_at DESC);
CREATE INDEX idx_orders_user_created ON orders (user_id, created_at DESC);

-- Query rewrite — correlated subquery'ni LATERAL bilan
SELECT 
    u.name, 
    COALESCE(stats.orders, 0) AS orders,
    COALESCE(stats.revenue, 0) AS revenue,
    stats.last_order
FROM users u
LEFT JOIN LATERAL (
    SELECT 
        COUNT(*) AS orders,
        SUM(amount) AS revenue,
        MAX(created_at) AS last_order
    FROM orders o 
    WHERE o.user_id = u.id
) stats ON true
WHERE u.tenant_id = 'xxx'
  AND u.created_at > NOW() - INTERVAL '1 year'
ORDER BY stats.revenue DESC NULLS LAST
LIMIT 100;

-- Natija: 12s → 80ms. 150x tezroq.

pg_stat_statements — asosiy debugging tool

pg_stat.sql
-- Extension
CREATE EXTENSION IF NOT EXISTS pg_stat_statements;

-- Eng sekin query'lar
SELECT 
    substring(query, 1, 80) AS query,
    calls,
    round(total_exec_time::numeric, 2) AS total_ms,
    round(mean_exec_time::numeric, 2) AS avg_ms,
    round(stddev_exec_time::numeric, 2) AS stddev_ms
FROM pg_stat_statements
ORDER BY total_exec_time DESC
LIMIT 20;

-- Eng ko'p I/O qiladiganlar
SELECT 
    substring(query, 1, 80) AS query,
    calls,
    shared_blks_read,  -- disk read
    shared_blks_hit    -- cache hit
FROM pg_stat_statements
ORDER BY shared_blks_read DESC
LIMIT 20;
5.3

connection pooling — pgbouncer deep dive

Connection pool qanday ishlaydi va qaysi mode qachon ishlatiladi?
javob
Muammo

Har PostgreSQL connection ~10 MB RAM oladi + process fork. 1000 connection = 10 GB RAM faqat connection'larga. PostgreSQL'da max_connections odatda 100-200. Lekin sizning application 50 ta container × 10 connection = 500 connection kerak.

App pod 1 (10 conn) App pod 2 (10 conn) App pod 3 (10 conn) ... App pod 50 (10 conn) 500 connections PgBouncer transaction pool 500 clients ↔ 20 backends multiplexing by transaction PostgreSQL max_connections=100 20 backends aktiv 80 ta zaxirada Multiplexing: 500 client connection → 20 real PostgreSQL backend. Resource 25x tejaladi.

PgBouncer pool modes

ModeIshlashMultiplexingCheklov
sessionClient connect → disconnectYo'q (1:1)Ko'p kerak emas
transactionHar transaction uchun backendZo'r (50:1 OK)No prepared stmt*, no LISTEN/NOTIFY
statementHar statement uchunEng yaxshiNo transactions (SELECT faqat)

* psycopg 3.1+ va asyncpg 0.28+ — transaction pool bilan prepared statement'ni qo'llab-quvvatlaydi (protocol-level disable).

pgbouncer.ini
[databases]
myapp = host=postgres-primary.internal port=5432 dbname=myapp

[pgbouncer]
listen_port = 6432
listen_addr = 0.0.0.0

# Auth
auth_type = scram-sha-256
auth_file = /etc/pgbouncer/userlist.txt

# Pool config
pool_mode = transaction           # CRITICAL choice
max_client_conn = 1000            # clients PgBouncer'ga
default_pool_size = 20            # backends PG'ga (per db/user)
reserve_pool_size = 5             # emergency
reserve_pool_timeout = 3

# Timeouts
server_lifetime = 3600
server_idle_timeout = 600
query_timeout = 30
idle_transaction_timeout = 60

# Logging
log_connections = 0
log_disconnections = 0
stats_period = 60

Application config — SQLAlchemy

engine.py
from sqlalchemy.ext.asyncio import create_async_engine

# PgBouncer ORQALI connect qilamiz (6432 port)
engine = create_async_engine(
    "postgresql+asyncpg://user:pass@pgbouncer.internal:6432/myapp",
    pool_size=10,
    max_overflow=5,
    pool_pre_ping=True,           # connection tekshirish (health check)
    pool_recycle=3600,            # har soatda qayta yaratish
    connect_args={
        # MUHIM: transaction pool mode uchun
        "statement_cache_size": 0,
        "prepared_statement_cache_size": 0,
    }
)

Monitoring — PgBouncer stats

monitoring.sql
-- PgBouncer admin'ga connect
-- psql -h pgbouncer -p 6432 pgbouncer

SHOW POOLS;
-- cl_active | cl_waiting | sv_active | sv_idle
--   45      |     0      |    18     |    2

SHOW STATS;
-- total_xact_count, total_query_count, avg_xact_time

SHOW CLIENTS;
-- clients va ularning state'i
Real case — Beeline Uzbekistan

50 ta FastAPI pod, har biri 10 connection = 500 client connection. PostgreSQL max_connections = 200 (monitoring, backup, admin uchun ham kerak). PgBouncer transaction mode bilan: 500 client → 20 backend connection. Latency p99 kamaydi, PostgreSQL CPU 30%ga tushdi.

5.4

sharding, replication, read replicas

Vertical scale tugadi — horizontal qanday qilinadi?
javob

Scaling journey — order matters

Stage 1: Optimize → index, query tuning, caching (80% muammo hal) Stage 2: Vertical → katta server (more CPU, RAM, NVMe) (10% hal) Stage 3: Read replica → read trafikni distribute (5% hal) Stage 4: Sharding → write'ni ham distribute (4% hal) Stage 5: Rethink DB → different data model (NoSQL, vector) (1% hal) 85% muammolar Stage 1 da hal bo'ladi. Stage 4 ga yetganingiz — 1M+ user.

Replication topologies

Primary-Replica (eng oddiy)
replication_setup.sh
# Primary postgresql.conf
wal_level = replica
max_wal_senders = 10
wal_keep_size = 2GB
synchronous_commit = on           # data loss tolerance

# Replica uchun user
CREATE ROLE replicator WITH REPLICATION LOGIN PASSWORD '...';

# Replica setup
pg_basebackup -h primary -D /var/lib/postgresql/data \
  -U replicator -P -W -R

# Replica automatic:
# standby.signal file yaratiladi
# primary_conninfo postgresql.auto.conf'ga yoziladi
Async vs Sync replication
Async (default) +
  • Primary write tez
  • Replica'ga bog'liq emas
  • Network issue primary'ni bloklash yo'q
Async −
  • Replication lag (100ms-5s)
  • Primary crash — oxirgi commit'lar yo'qolishi
  • Read-after-write consistency yo'q
read_after_write.py
"""Read-after-write pattern: just-updated data uchun primary'dan o'qish"""

class SmartDBRouter:
    async def update_and_track(self, user_id: str, data: dict):
        user = await self.primary.update_user(user_id, data)
        # Marker — keyingi 5 sek primary'dan o'qish
        await redis.setex(f"primary_only:{user_id}", 5, "1")
        return user
    
    async def read_user(self, user_id: str):
        # Check — just updated?
        if await redis.get(f"primary_only:{user_id}"):
            return await self.primary.get_user(user_id)
        
        # Lag check
        lag = await self.get_replica_lag()
        if lag > 10:  # 10 sek dan ko'p
            return await self.primary.get_user(user_id)
        
        return await self.replica.get_user(user_id)
    
    async def get_replica_lag(self) -> float:
        result = await self.replica.fetch_val(
            "SELECT EXTRACT(EPOCH FROM (NOW() - pg_last_xact_replay_timestamp()))"
        )
        return float(result or 0)

Sharding strategies

StrategyQachonPro/Con
Hash-basedTeng distribution kerak+ Balance, − Range query qiyin
Range-basedTime-series, natural ranges+ Range fast, − Hot spot
Tenant-basedSaaS (one shard per tenant)+ Isolation, − Re-shard qiyin
GeographicMulti-region compliance+ GDPR, − Cross-region join
Consistent hashingDinamik shard qo'shish+ Re-shard oson, − Murakkab
Citus — PostgreSQL sharding layer
citus.sql
-- Citus extension (native PostgreSQL sharding)
CREATE EXTENSION citus;

-- Shardlash uchun jadval
CREATE TABLE events (
    id BIGSERIAL,
    tenant_id UUID NOT NULL,
    event_type TEXT,
    payload JSONB,
    created_at TIMESTAMPTZ DEFAULT NOW()
);

-- Shard key — tenant_id
SELECT create_distributed_table('events', 'tenant_id');

-- Citus avtomatik 32 shard yaratadi (default)
-- Query'lar avtomatik to'g'ri shard'ga boradi
-- Cross-shard aggregate: parallel scatter-gather

-- Reference table (kichik, har shard'da copy)
CREATE TABLE tenants (id UUID PRIMARY KEY, name TEXT);
SELECT create_reference_table('tenants');
Modern alternatives — new-gen databases

Klassik PostgreSQL sharding o'rniga tayyor scalable DB'lar: CockroachDB (distributed SQL), YugabyteDB (PostgreSQL-wire compatible), Neon (serverless PG with branching), PlanetScale (MySQL + Vitess). Operatsion xarajat kamroq, lekin cost yuqoriroq.

5.5

vector databases — ai infrastructure yurakligi

pgvector, Pinecone, Qdrant, Weaviate — qaysi birini tanlash? Amaliy realitet.
javob
Nima uchun vector DB?

AI model matn yoki rasmni embedding'ga aylantiradi — 384 yoki 1536 o'lchovli raqamlar vektori. "Bu matnga o'xshash matnlar qaysilari?" degan savolga javob — vector similarity. B-tree index'lar bu vazifaga yaroqsiz — HNSW yoki IVF kerak.

HNSW algoritmi — qanday ishlaydi?

HNSW = Hierarchical Navigable Small World ───────────────────────────────────────── Layer 3 (eng tepa, kam nodes): O ──── O ──── O │ │ │ Layer 2: O───O──O──O───O │ │ │ │ │ Layer 1: O──O───O──O──O───O──O │ │ │ │ │ │ │ Layer 0 (hamma nodes): O──O──O───O──O──O───O──O──O Search: Top layer'dan boshlaydi, tez approximate, quyiga tushib aniqlashtiradi. O(log n) complexity, 95-99% recall.

pgvector — eng oddiy boshlang'ich

Sizning stack'ingizda PostgreSQL bor bo'lsa, alohida vector DB kerak emas. pgvector 0.8 (2025-dan) HNSW qo'llab-quvvatlaydi. 50M vektor'gacha production uchun yetarli.

pgvector.sql
-- Extension
CREATE EXTENSION vector;

-- Embedding jadval
CREATE TABLE documents (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    tenant_id UUID NOT NULL,
    content TEXT NOT NULL,
    embedding vector(1536),    -- OpenAI ada-002 dim
    metadata JSONB,
    created_at TIMESTAMPTZ DEFAULT NOW()
);

-- HNSW index (tez ANN search)
CREATE INDEX idx_documents_embedding 
  ON documents 
  USING hnsw (embedding vector_cosine_ops)
  WITH (m = 16, ef_construction = 64);

-- ivfflat alternative (kichikroq RAM, lekin sekinroq)
-- CREATE INDEX ... USING ivfflat (embedding vector_cosine_ops) WITH (lists = 100);

-- Query — eng yaqin 10 ta document
SELECT 
    id, content, metadata,
    1 - (embedding <=> $1::vector) AS similarity
FROM documents
WHERE tenant_id = $2
ORDER BY embedding <=> $1::vector    -- cosine distance
LIMIT 10;

-- Operators:
-- <-> Euclidean distance
-- <=> Cosine distance  
-- <#> Inner product (negative)

Qdrant — production default

Rust'da yozilgan, eng tez filtered search (p50 4ms @ 1M vectors). Open-source + managed cloud.

qdrant_client.py
from qdrant_client import QdrantClient, AsyncQdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct, Filter, FieldCondition, MatchValue

client = AsyncQdrantClient(url="https://qdrant.internal:6333", api_key="...")

# Collection yaratish
await client.create_collection(
    collection_name="documents",
    vectors_config=VectorParams(size=1536, distance=Distance.COSINE),
)

# Index qo'shish (filter uchun)
await client.create_payload_index(
    collection_name="documents",
    field_name="tenant_id",
    field_schema="keyword"
)

# Batch insert
await client.upsert(
    collection_name="documents",
    points=[
        PointStruct(
            id=doc_id,
            vector=embedding,   # list[float] 1536
            payload={"tenant_id": str(tenant), "title": title, "content": text}
        )
        for doc_id, embedding, tenant, title, text in batch
    ]
)

# Hybrid query — vector + metadata filter
results = await client.search(
    collection_name="documents",
    query_vector=query_embedding,
    query_filter=Filter(
        must=[
            FieldCondition(key="tenant_id", match=MatchValue(value=tenant_id)),
            FieldCondition(key="status", match=MatchValue(value="published"))
        ]
    ),
    limit=10,
    with_payload=True,
    score_threshold=0.7     # low-confidence'ni filter
)

Vector DB landscape — comparison

DBTypeStrengthCost (@10M vectors)Best for
pgvectorPostgreSQL extSQL + vector bir joyda~$40/mo (DigitalOcean VPS)Start here, <50M vectors
QdrantPurpose-builtEng tez filtered, Rust~$100/mo cloudProduction default
PineconeManaged SaaSZero ops, serverless$300-500/moEnterprise, compliance
WeaviateOpen + cloudHybrid BM25+vector~$200/mo cloudSemantic + keyword search
MilvusOpen-sourceBillions scale, GPUInfra-dependent100M+ vectors
ChromaEmbeddedDeveloper-friendly~$0 self-hostPrototypes, local AI

Cost breakdown — 10M vectors real case

10M vectors × 1536 dim × 4 bytes = ~60 GB raw HNSW overhead: ~1.5x = ~90 GB Pinecone Serverless: Storage: $0.33/GB × 90 = $30 Reads (1M Queries/mo × 8 units): $66 Total: ~$100/mo (low traffic) → $500+ (high) Qdrant self-hosted (DigitalOcean): 64 GB RAM + 200 GB SSD: ~$100/mo fixed Unlimited queries Qdrant Cloud: Similar spec ~$120/mo pgvector on existing PostgreSQL: ~$0 incremental (already have DB) Lekin RAM kerak — ishonib ko'rsatish
Decision tree

0-10M vectors: pgvector ishlating (operatsion complexity yo'q)
10-100M vectors: Qdrant self-hosted
100M+ yoki compliance: Pinecone yoki Milvus
Hybrid search kerak: Weaviate
Prototype: Chroma (bir chiziqda boshlanadi)

Quantization — cost kamaytirish

Vektorni float32 → int8 qilsangiz: 4x kichikroq disk, 3-4x tez, faqat <1% recall loss.

quantization.py
# Qdrant da scalar quantization
from qdrant_client.models import ScalarQuantization, ScalarType, ScalarQuantizationConfig

await client.create_collection(
    collection_name="documents",
    vectors_config=VectorParams(size=1536, distance=Distance.COSINE),
    quantization_config=ScalarQuantization(
        scalar=ScalarQuantizationConfig(
            type=ScalarType.INT8,
            quantile=0.99,
            always_ram=True,  # quantized vektorlar RAM'da
        )
    )
)
# RAM: 60 GB → 15 GB (4x saving)
06

async systems — task queues, events, messaging

6.1

celery — production-grade task queue

Celery qanday ishlaydi? Production'da qanday qilinsa yaxshi?
javob
Nima uchun task queue? — Synchronous muammosi

Tasavvur qiling: restoran kasiriga mijoz keladi. Kassir "hozir oshpazga borib ovqatingizni pishirib kelaman" desa — navbatdagi 50 ta mijoz kutib turadi. Task queue — bu kassirga "chek berib, keyingisiga o'ting" texnikasi: mijozga darhol "qabul qilindi" javobi, oshpaz esa orqada bajaradi.

Celery holatlari: email yuborish (3 sek), PDF generatsiya (10 sek), ML inference (30 sek), report (5 min), nightly batch (soatlar). Bularning hammasini HTTP request ichida bajarish mumkin emas — user kutmaydi, connection timeout bo'ladi, server thread bloklanadi.

Celery arxitekturasi

Producer FastAPI/Django task.delay() Broker (Redis/RabbitMQ) queue persistence routing, priority Worker pool #1 fast queue Worker pool #2 heavy queue Worker pool #N gpu/ml queue Result backend Retry · DLQ · Rate limit · Priority · Scheduled Producer task'ni broker'ga yuboradi → broker queue'da saqlaydi → Worker pick qiladi → bajaradi → result backend'ga yozadi

Production-ready task

tasks.py
from celery import Celery, Task
from celery.exceptions import MaxRetriesExceededError
import structlog

logger = structlog.get_logger()

app = Celery(
    'myapp',
    broker='redis://redis:6379/0',
    backend='redis://redis:6379/1'
)

app.conf.update(
    task_acks_late=True,              # faqat success'dan keyin ack
    task_reject_on_worker_lost=True,  # worker crash bo'lsa — qayta
    task_track_started=True,
    worker_prefetch_multiplier=1,     # fair distribution
    task_serializer='json',
    result_serializer='json',
    timezone='UTC',
    enable_utc=True,
    task_soft_time_limit=300,         # 5 daqiqa soft
    task_time_limit=360,              # 6 daqiqa hard kill
)

app.conf.task_routes = {
    'app.tasks.send_email':      {'queue': 'fast', 'priority': 5},
    'app.tasks.process_payment': {'queue': 'fast', 'priority': 9},
    'app.tasks.generate_report': {'queue': 'heavy', 'priority': 3},
    'app.tasks.embed_document':  {'queue': 'gpu', 'priority': 5},
}

class RetryableTask(Task):
    """Base class — automatic retry with exponential backoff"""
    autoretry_for = (ConnectionError, TimeoutError)
    retry_backoff = True
    retry_backoff_max = 300
    retry_jitter = True
    max_retries = 5

@app.task(
    base=RetryableTask,
    bind=True,
    rate_limit='100/m',
    acks_late=True,
)
def send_email(self, tenant_id: str, to: str, subject: str, body: str):
    logger.info("send_email.start", task_id=self.request.id, tenant=tenant_id)
    try:
        result = email_service.send(to=to, subject=subject, body=body)
        logger.info("send_email.success", task_id=self.request.id)
        return {"status": "sent", "message_id": result.id}
    except SMTPRateLimited as e:
        # 1 daqiqadan keyin qayta
        raise self.retry(exc=e, countdown=60)
    except SMTPRecipientsRefused as e:
        # Qayta urinib bo'lmaydi — DLQ ga
        logger.error("send_email.invalid_recipient", to=to)
        raise  # task_failure_handler DLQ'ga yozadi

Celery Canvas — workflow orchestration

workflows.py
from celery import chain, group, chord

# CHAIN — sequential (birin-ketin)
workflow = chain(
    fetch_user_data.s(user_id),
    enrich_with_analytics.s(),
    generate_report.s(),
    upload_to_s3.s(),
    notify_user.s(user_id)
)
workflow.apply_async()

# GROUP — parallel
header = group(
    process_image.s(img_id) for img_id in image_ids
)
result = header.apply_async()
results = result.join()  # wait all

# CHORD — parallel + callback
workflow = chord(
    [process_image.s(img) for img in images],  # parallel
    combine_images.s()                          # callback
)

# MAP-REDUCE style
reduce_workflow = chord(
    group(analyze_doc.s(doc) for doc in documents),
    aggregate_results.s()
)

Scheduled tasks — Celery Beat

celery_beat.py
from celery.schedules import crontab

app.conf.beat_schedule = {
    # Har 15 daqiqada
    'cleanup-expired-sessions': {
        'task': 'app.tasks.cleanup_sessions',
        'schedule': 900.0,
    },
    # Har kuni 02:00 UTC
    'daily-reports': {
        'task': 'app.tasks.generate_daily_reports',
        'schedule': crontab(hour=2, minute=0),
    },
    # Dushanba ertalab
    'weekly-digest': {
        'task': 'app.tasks.send_weekly_digest',
        'schedule': crontab(day_of_week=1, hour=9, minute=0),
    },
}
6.2

kafka — event streaming platform

Kafka nima? Qachon Celery yetmaydi va Kafka kerak bo'ladi?
javob
Celery vs Kafka — asosiy farq

Celery/RabbitMQ — task queue. Worker task'ni oladi, bajaradi, o'chiradi. Bir task — bir worker. Kafka — immutable event log. Event yozildi → o'chirmaysiz, kunlar/haftalar saqlaydi. Ko'plab mustaqil consumer group'lar bir xil event'ni o'z tezligida o'qiydi.

Misol: foydalanuvchi buyurtma berdi. Celery: "order_confirmation yuborish" vazifasi bitta worker'ga ketadi — bajarildi, tamom. Kafka: "order.placed" event yoziladi → Analytics service (o'z tezligida), Notification service (real-time), ML training pipeline (kechqurun), Accounting service (kunlik batch) — hammasi bir xil event'ni o'qiydi, bir-biriga ta'sir qilmaydi.

Kafka partition — parallel ishlash siri

Kafka topic bir nechta partition'ga bo'linadi. Har partition — tartiblangan log. Har partition bitta consumer (bir consumer group'da) tomonidan o'qiladi. Demak: 6 partition = 6 parallel consumer = 6x throughput. Ko'proq partition — ko'proq parallellik. Lekin partition soni so'ngli muvozanat kerak: juda ko'p partition = ko'p metadata, slow leader election. Production'da 1 partition ≈ 10-100 MB/s throughput — shunga qarab hisoblang.

Kafka fundamentals

Kafka topic → partitions → log ───────────────────────────────── orders topic: partition 0: [ev1, ev2, ev3, ev4, ev5, ...] partition 1: [ev1, ev2, ev3, ev4, ev5, ...] partition 2: [ev1, ev2, ev3, ev4, ev5, ...] Consumer groups: analytics-group: reads orders (position 4523) notifications-group: reads orders (position 4890) ml-training-group: reads orders (position 1200) ← behind! Har consumer group alohida position saqlaydi. Bir event — ko'p consumer o'qiydi.

Kafka producer

producer.py
from aiokafka import AIOKafkaProducer
import json

class EventProducer:
    def __init__(self):
        self.producer = AIOKafkaProducer(
            bootstrap_servers='kafka1:9092,kafka2:9092,kafka3:9092',
            value_serializer=lambda v: json.dumps(v).encode(),
            key_serializer=lambda k: k.encode() if k else None,
            compression_type='lz4',
            acks='all',                    # all replicas confirm
            enable_idempotence=True,       # exactly-once
            max_in_flight_requests_per_connection=5,
        )
    
    async def start(self):
        await self.producer.start()
    
    async def emit(self, topic: str, event: dict, key: str = None):
        """Event'ni Kafka'ga yuborish"""
        # Envelope — standard metadata
        envelope = {
            "event_id": str(uuid4()),
            "event_type": event["type"],
            "event_version": "v1",
            "occurred_at": datetime.utcnow().isoformat(),
            "tenant_id": event.get("tenant_id"),
            "payload": event.get("payload", {}),
            "trace_id": get_current_trace_id(),
        }
        
        # key — same key → same partition → ordering
        await self.producer.send_and_wait(
            topic=topic,
            value=envelope,
            key=key or envelope.get("tenant_id"),
        )

# Ishlatish
producer = EventProducer()
await producer.start()

await producer.emit(
    topic="orders.events",
    event={
        "type": "order.created",
        "tenant_id": "tenant-123",
        "payload": {"order_id": "o-456", "amount": 99.99}
    },
    key="tenant-123"  # tenant bo'yicha ordering
)

Consumer — exactly-once semantics

consumer.py
from aiokafka import AIOKafkaConsumer

async def run_analytics_consumer():
    consumer = AIOKafkaConsumer(
        'orders.events',
        bootstrap_servers='kafka1:9092,kafka2:9092,kafka3:9092',
        group_id='analytics-v1',           # consumer group
        enable_auto_commit=False,           # manual commit muhim
        auto_offset_reset='earliest',
        value_deserializer=lambda v: json.loads(v.decode()),
        max_poll_records=100,
    )
    await consumer.start()
    
    try:
        async for msg in consumer:
            try:
                # Idempotency check — event'ni qayta ishlatmaslik
                if await already_processed(msg.value["event_id"]):
                    await consumer.commit()
                    continue
                
                # Process
                async with db.transaction():
                    await process_analytics_event(msg.value)
                    await mark_processed(msg.value["event_id"])
                
                # Manual commit — success'dan keyin
                await consumer.commit()
                
            except Exception as e:
                logger.error("consumer.error", error=str(e), offset=msg.offset)
                # Commit qilmaymiz — qayta keladi
                # N marta fail bo'lsa DLQ topic'ga yuboramiz
    finally:
        await consumer.stop()

Celery vs Kafka — tanlash

AspectCelery/RabbitMQKafka
ModelTask queueEvent log
RetentionBajarilgan = o'chirildiKun/hafta/umr saqlanadi
Consumer'lar1 task — 1 consumer1 event — N consumer
Throughput10k msg/sec1M+ msg/sec
LatencyPast (<10ms)O'rtacha (10-50ms)
Ops complexityPastYuqori (ZK/KRaft, partitions)
Use caseEmail, payments, reportsEvent sourcing, analytics, ML pipelines
6.3

event sourcing & cqrs

Event Sourcing va CQRS — nima, qachon, qanday?
javob
Klassik vs Event Sourcing

Klassik CRUD: jadvalda hozirgi state saqlanadi. balance = 100.
Event Sourcing: jadvalda hamma o'zgarishlar saqlanadi. [+100, -20, +50, -30]. Hozirgi state — bu event'lar yig'indisi.

Event Sourcing misol — Wallet

wallet_events.py
from dataclasses import dataclass
from datetime import datetime
from decimal import Decimal
from typing import List

@dataclass(frozen=True)
class WalletEvent:
    wallet_id: str
    event_type: str
    occurred_at: datetime
    version: int

@dataclass(frozen=True)
class WalletCreated(WalletEvent):
    user_id: str
    currency: str

@dataclass(frozen=True)
class MoneyDeposited(WalletEvent):
    amount: Decimal
    source: str

@dataclass(frozen=True)
class MoneyWithdrawn(WalletEvent):
    amount: Decimal
    destination: str

@dataclass
class Wallet:
    id: str
    user_id: str
    balance: Decimal = Decimal("0")
    version: int = 0
    
    @classmethod
    def from_events(cls, events: List[WalletEvent]) -> "Wallet":
        """State'ni event'lardan qayta tiklash"""
        if not events or not isinstance(events[0], WalletCreated):
            raise ValueError("Need WalletCreated first")
        
        first = events[0]
        wallet = cls(id=first.wallet_id, user_id=first.user_id, version=1)
        
        for event in events[1:]:
            wallet.apply(event)
        return wallet
    
    def apply(self, event: WalletEvent):
        if isinstance(event, MoneyDeposited):
            self.balance += event.amount
        elif isinstance(event, MoneyWithdrawn):
            if self.balance < event.amount:
                raise InsufficientFunds()
            self.balance -= event.amount
        self.version = event.version

# Event store
class EventStore:
    async def append(self, stream_id: str, events: List[WalletEvent], expected_version: int):
        """Optimistic concurrency control"""
        async with db.transaction():
            current = await db.fetch_val(
                "SELECT COALESCE(MAX(version), 0) FROM events WHERE stream_id = $1",
                stream_id
            )
            if current != expected_version:
                raise ConcurrencyError(f"Expected {expected_version}, got {current}")
            
            for event in events:
                await db.execute("""
                    INSERT INTO events (stream_id, event_type, payload, version, occurred_at)
                    VALUES ($1, $2, $3, $4, $5)
                """, stream_id, event.event_type, event_to_json(event), event.version, event.occurred_at)

CQRS — Command Query Responsibility Segregation

Write model (commands) va read model (queries) alohida optimallashtiriladi.

Command WRITE side Write Model Event Store PostgreSQL / Kafka Projector event → read model (async) Read DB #1 balance summary Read DB #2 analytics OLAP Search index Elasticsearch Query READ side Write: eventga yoziladi. Projector har consumer uchun o'z read model'ini quradi.
Event Sourcing — majburiy emas

Har sistemaga ham kerak emas. Faqat: audit log shart (bank, healthcare), "time travel" — hist state ko'rish kerak, yoki multiple read model'lar (analytics, search). Aks holda — klassik CRUD bilan boshlang.

6.4

saga pattern & transactional outbox

Distributed transaction yo'q — microservice'lar qanday birgalikda ishlaydi?
javob

Saga — distributed transaction alternativi

Order yaratish: Order service, Payment service, Inventory service. Birida fail bo'lsa — oldingilarni rollback. 2PC ishlamaydi (network partition, complexity). Saga — bu compensating transactions zanjiri.

Choreography saga — event-driven
saga_choreography.py
"""Har service event publish qiladi, boshqalari subscribe qiladi"""

# Order service
async def create_order(user_id, items):
    order = await db.insert_order(user_id, items, status="pending")
    await events.publish("order.created", {
        "order_id": order.id, "user_id": user_id, 
        "items": items, "total": order.total
    })
    return order

# Payment service (subscribe to order.created)
async def handle_order_created(event):
    try:
        charge = await stripe.charge(event["user_id"], event["total"])
        await events.publish("payment.succeeded", {
            "order_id": event["order_id"], "charge_id": charge.id
        })
    except PaymentError:
        await events.publish("payment.failed", {
            "order_id": event["order_id"], "reason": str(e)
        })

# Inventory service (subscribe to payment.succeeded)
async def handle_payment_succeeded(event):
    try:
        await reserve_inventory(event["order_id"])
        await events.publish("inventory.reserved", {"order_id": event["order_id"]})
    except OutOfStock:
        await events.publish("inventory.failed", {"order_id": event["order_id"]})
        # Compensating action
        await events.publish("payment.refund_requested", {"order_id": event["order_id"]})

# Order service (subscribe to all)
async def handle_inventory_reserved(event):
    await db.update_order_status(event["order_id"], "confirmed")

async def handle_payment_failed(event):
    await db.update_order_status(event["order_id"], "cancelled")
Orchestration saga — central coordinator
saga_orchestration.py
"""Central coordinator step'larni boshqaradi"""

class OrderSaga:
    async def execute(self, order_data):
        saga_id = uuid4()
        state = {"order_id": None, "charge_id": None, "inventory_id": None}
        
        try:
            # Step 1: Create order
            order = await order_service.create(order_data)
            state["order_id"] = order.id
            
            # Step 2: Charge payment
            charge = await payment_service.charge(order.total)
            state["charge_id"] = charge.id
            
            # Step 3: Reserve inventory
            inventory = await inventory_service.reserve(order.items)
            state["inventory_id"] = inventory.id
            
            # Step 4: Confirm
            await order_service.confirm(order.id)
            return order
            
        except Exception as e:
            # Rollback — compensating transactions
            if state.get("inventory_id"):
                await inventory_service.release(state["inventory_id"])
            if state.get("charge_id"):
                await payment_service.refund(state["charge_id"])
            if state.get("order_id"):
                await order_service.cancel(state["order_id"])
            raise

Transactional Outbox — reliability pattern

Muammo

DB'ga order yozish va Kafka'ga event yuborish — ikki alohida sistema. Birida commit bo'lib, boshqasida fail bo'lsa — inconsistency. 2PC yo'q. Yechim: outbox.

outbox_pattern.sql
-- Outbox jadvali — business DB'da
CREATE TABLE outbox (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    aggregate_type TEXT NOT NULL,    -- 'order', 'user'
    aggregate_id UUID NOT NULL,
    event_type TEXT NOT NULL,
    payload JSONB NOT NULL,
    occurred_at TIMESTAMPTZ DEFAULT NOW(),
    published_at TIMESTAMPTZ,
    retry_count INT DEFAULT 0
);

CREATE INDEX idx_outbox_unpublished 
  ON outbox (occurred_at) 
  WHERE published_at IS NULL;
outbox.py
# Business logic — ONE transaction, DB + outbox
async def create_order(data):
    async with db.transaction():
        order = await db.insert_order(data)
        await db.insert_outbox({
            "aggregate_type": "order",
            "aggregate_id": order.id,
            "event_type": "order.created",
            "payload": order.to_dict(),
        })
    return order
    # ATOMIC: order va outbox birga commit bo'ladi

# Relay worker — outbox → Kafka
async def outbox_relay():
    while True:
        async with db.transaction():
            rows = await db.fetch_all("""
                SELECT * FROM outbox 
                WHERE published_at IS NULL 
                ORDER BY occurred_at
                LIMIT 100
                FOR UPDATE SKIP LOCKED
            """)
            
            for row in rows:
                try:
                    await kafka.send(
                        topic=f"{row.aggregate_type}.events",
                        value=row.payload,
                        key=str(row.aggregate_id),
                    )
                    await db.execute(
                        "UPDATE outbox SET published_at = NOW() WHERE id = $1",
                        row.id
                    )
                except Exception as e:
                    await db.execute(
                        "UPDATE outbox SET retry_count = retry_count + 1 WHERE id = $1",
                        row.id
                    )
        
        await asyncio.sleep(1)

# Alternative: Debezium (CDC) — PostgreSQL WAL'dan to'g'ridan to'g'ri o'qib Kafka'ga
Idempotency keys — must have

Har event uchun unique event_id. Consumer tomonda "already processed?" check. Bu event'ni 2 marta olish (at-least-once delivery) — normal; 2 marta ishlatish — bug.

07

real-time systems

7.1

websocket — bidirectional real-time

Chat, live notifications, collaborative editing — qanday qilinadi?
javob

WebSocket vs polling vs SSE — tanlash

TexnikaDirectionOverheadUse case
Short pollingClient → ServerJuda yuqoriEng oddiy case
Long pollingClient → ServerYuqoriLegacy fallback
SSEServer → Client onlyPastNotifications, LLM streaming
WebSocketBidirectionalEng pastChat, collab, games
WebRTCP2PPast (after setup)Video, voice, low-latency

FastAPI WebSocket + Redis pub-sub

ws_chat.py
from fastapi import WebSocket, WebSocketDisconnect
import redis.asyncio as redis

class ConnectionManager:
    """Multi-instance WebSocket via Redis pub-sub"""
    def __init__(self):
        self.local: dict[str, set[WebSocket]] = {}  # room_id -> connections
        self.redis = redis.Redis.from_url("redis://redis:6379")
        self.pubsub = self.redis.pubsub()
    
    async def connect(self, ws: WebSocket, room_id: str, user_id: str):
        await ws.accept()
        self.local.setdefault(room_id, set()).add(ws)
        
        # Redis subscription (agar bu birinchi connection shu room'ga)
        if len(self.local[room_id]) == 1:
            await self.pubsub.subscribe(f"room:{room_id}")
        
        # Join notification (boshqa instance'lar ham eshitadi)
        await self.redis.publish(f"room:{room_id}", json.dumps({
            "type": "user.joined", "user_id": user_id
        }))
    
    async def disconnect(self, ws: WebSocket, room_id: str):
        self.local.get(room_id, set()).discard(ws)
        if not self.local.get(room_id):
            await self.pubsub.unsubscribe(f"room:{room_id}")
    
    async def broadcast_to_room(self, room_id: str, message: dict):
        # Redis orqali — hamma instance'ga
        await self.redis.publish(f"room:{room_id}", json.dumps(message))
    
    async def redis_listener(self):
        """Redis'dan kelgan message'larni local WebSocket'larga forward"""
        async for message in self.pubsub.listen():
            if message["type"] != "message":
                continue
            room_id = message["channel"].decode().split(":")[1]
            data = json.loads(message["data"])
            
            for ws in self.local.get(room_id, set()):
                try:
                    await ws.send_json(data)
                except Exception:
                    pass  # connection broke

manager = ConnectionManager()

@app.on_event("startup")
async def startup():
    asyncio.create_task(manager.redis_listener())

@app.websocket("/ws/room/{room_id}")
async def ws_endpoint(ws: WebSocket, room_id: str):
    user = await authenticate_ws(ws)  # JWT in query param
    await manager.connect(ws, room_id, user.id)
    try:
        while True:
            data = await ws.receive_json()
            await manager.broadcast_to_room(room_id, {
                "type": "message",
                "user_id": user.id,
                "content": data["content"],
                "timestamp": time.time(),
            })
    except WebSocketDisconnect:
        await manager.disconnect(ws, room_id)

Production considerations

  • Heartbeat/ping-pong — har 30 sekundda ping, aks holda dead connection sezilmaydi
  • Reconnection logic — client'da exponential backoff
  • Message persistence — offline user uchun queue
  • Rate limiting — per-connection, spam'ni to'xtatish
  • Sticky sessions — LB (Load Balancer) bir connection'ni bir server'ga yo'naltiradi
  • Graceful shutdown — deploy vaqtida connection'lar sekin ko'chadi
7.2

server-sent events (sse) — llm streaming uchun ideal

ChatGPT javobni "yozayotgan"dek ko'rsatadi — bu qanday ishlaydi?
javob
SSE = Server-Sent Events

Server → Client bir tomonlama stream. HTTP/1.1 ustida. WebSocket'dan soddaroq. Auto-reconnect, text-based. LLM response streaming, live notifications, stock tickers uchun — ideal.

llm_sse.py
from fastapi import Request
from fastapi.responses import StreamingResponse
from anthropic import AsyncAnthropic

client = AsyncAnthropic()

@app.post("/api/chat/stream")
async def chat_stream(request: Request, body: ChatRequest):
    async def event_stream():
        try:
            async with client.messages.stream(
                model="claude-opus-4-7",
                max_tokens=2048,
                system=body.system_prompt,
                messages=body.messages,
            ) as stream:
                async for text in stream.text_stream:
                    # Client disconnected?
                    if await request.is_disconnected():
                        break
                    
                    # SSE format: "data: \n\n"
                    yield f"data: {json.dumps({'type': 'token', 'text': text})}\n\n"
                
                # Final message
                final = await stream.get_final_message()
                yield f"data: {json.dumps({'type': 'done', 'usage': final.usage.dict()})}\n\n"
                
        except Exception as e:
            yield f"data: {json.dumps({'type': 'error', 'message': str(e)})}\n\n"
    
    return StreamingResponse(
        event_stream(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",   # nginx buffering o'chirish
        }
    )
client.js
// Browser tomon — EventSource API
const eventSource = new EventSource('/api/chat/stream?q=hello');

eventSource.onmessage = (event) => {
    const data = JSON.parse(event.data);
    if (data.type === 'token') {
        appendToUI(data.text);   // tokenlarni UI'da ko'rsatamiz
    } else if (data.type === 'done') {
        eventSource.close();
        console.log('Usage:', data.usage);
    }
};

eventSource.onerror = () => {
    // Browser avtomatik reconnect qiladi
    console.log('SSE error, retrying...');
};

SSE vs WebSocket tanlash

SSE — qachon
  • Faqat server → client kerak
  • LLM streaming
  • Live notifications
  • Stock price, sports scores
  • Auto-reconnect kerak
WebSocket — qachon
  • Bidirectional kerak (chat)
  • Low latency critical (games)
  • Binary data (files)
  • Collaborative editing
Part III

infrastructure & platform

Python mastery, containers, Kubernetes, cloud, observability. Code yozish — 30%. Qolgan 70% — uni ishga tushirish, monitor qilish, xavfsiz saqlash. 2026-da DevOps — backend'ning ajralmas qismi.

08

python & fastapi mastery

8.1

async/await — event loop mexanizmi

async/await qanday ishlaydi? GIL, threads, processes — qachon qaysi?
javob
Async nima va nima uchun kerak? — Oshpaz analogiyasi

Tasavvur qiling: oshpaz bitta ovqat pishirayapti — 20 daqiqa kutadi, hech narsa qilmaydi. Bu synchronous. Aqlli oshpaz esa: pasta qo'ydi, vaqt o'tib sos tayyorlar, keyin non qesim qiladi, soat chaliganda pastani tekshiradi — bitta oshpaz, bir vaqtda ko'p ish. Bu async.

Python'da: await db.query() — "DB javob berguncha boshqa task'larni bajara beraman". Event loop bitta thread'da yuzlab network request'ni parallel boshqaradi. I/O (network, DB, fayl) kutishi kerak bo'lganda boshqasiga o'tadi. CPU hisoblash (AI model, compression) uchun esa async yetmaydi — multiprocessing kerak.

GIL — Python'ning eng ko'p tushuntirilmaydigan muammosi

Python'da GIL (Global Interpreter Lock) bir vaqtda faqat bitta thread Python kodi bajarishiga ruxsat beradi. Ya'ni threading.Thread bilan CPU hisoblashni parallel qilolmaysiz — thread'lar navbat bilan ishlaydi. Yechim: multiprocessing — har process alohida Python interpreter (alohida GIL). Yoki ProcessPoolExecutor bilan FastAPI'da CPU task'larni pool'ga yuborish. Async I/O ga GIL ta'sir qilmaydi — await vaqtida GIL boshqasiga beriladi.

Event loop — single-thread concurrency

Event loop tushunchasi

Bitta thread. Event loop — cheksiz cycle. "Hozir kim ishga tayyor?" deb so'raydi. await — "men hozir kutaman, boshqa korutin ishlasin". Shuning uchun 1000 ta network request bitta thread'da parallel ishlay oladi.

async vs threading vs multiprocessing ───────────────────────────────────────── I/O bound (network, DB, file): async > threading > multiprocessing 1000+ concurrent: async only CPU bound (math, ML, image processing): multiprocessing > threading ≈ async GIL sababli threading CPU'ni bir vaqtda ishlata olmaydi Mixed (web server): async (I/O) + process pool (CPU tasks) uvicorn --workers 4 → 4 process, har birida async loop
async_patterns.py
import asyncio
import aiohttp
from asyncio import Semaphore, TaskGroup

# Pattern 1 — parallel requests with gather
async def fetch_all_fast(urls: list[str]) -> list[dict]:
    async with aiohttp.ClientSession() as session:
        tasks = [session.get(url) for url in urls]
        responses = await asyncio.gather(*tasks, return_exceptions=True)
        return [r for r in responses if not isinstance(r, Exception)]

# Pattern 2 — concurrency limit with Semaphore
async def fetch_with_limit(urls: list[str], limit: int = 10):
    sem = Semaphore(limit)
    
    async def bounded_fetch(url):
        async with sem:  # Max 10 parallel
            async with aiohttp.ClientSession() as session:
                async with session.get(url) as r:
                    return await r.json()
    
    return await asyncio.gather(*[bounded_fetch(url) for url in urls])

# Pattern 3 — TaskGroup (Python 3.11+) — structured concurrency
async def process_user_data(user_id: str):
    async with TaskGroup() as tg:
        profile_task = tg.create_task(fetch_profile(user_id))
        orders_task = tg.create_task(fetch_orders(user_id))
        prefs_task = tg.create_task(fetch_preferences(user_id))
    # Bu yerga yetganda — hammasi yakunlangan
    # Birortasi xato bersa — TaskGroup hammasini cancel qiladi
    return {
        "profile": profile_task.result(),
        "orders": orders_task.result(),
        "preferences": prefs_task.result(),
    }

# Pattern 4 — async + process pool (CPU-bound)
from concurrent.futures import ProcessPoolExecutor

process_pool = ProcessPoolExecutor(max_workers=4)

async def process_image_async(image_path: str):
    loop = asyncio.get_event_loop()
    # CPU-bound ishni process pool'ga tashlaymiz
    result = await loop.run_in_executor(
        process_pool, 
        heavy_image_processing,
        image_path
    )
    return result

# Pattern 5 — timeout bilan
async def fetch_with_timeout(url: str):
    try:
        async with asyncio.timeout(5):  # Python 3.11+
            async with aiohttp.ClientSession() as s:
                async with s.get(url) as r:
                    return await r.json()
    except asyncio.TimeoutError:
        return None

Async anti-patterns — qilmaslik kerak

anti_patterns.py
# ✗ ANTI-PATTERN 1: blocking call async funcda
async def bad_1():
    response = requests.get("https://api.example.com")  # BLOCKS EVENT LOOP!
    return response.json()

# ✓ To'g'ri
async def good_1():
    async with aiohttp.ClientSession() as s:
        async with s.get("https://api.example.com") as r:
            return await r.json()


# ✗ ANTI-PATTERN 2: sync loop async'da
async def bad_2(users):
    results = []
    for user in users:
        r = await fetch_user(user)  # sequential!
        results.append(r)
    return results

# ✓ To'g'ri — parallel
async def good_2(users):
    return await asyncio.gather(*[fetch_user(u) for u in users])


# ✗ ANTI-PATTERN 3: CPU-bound async'da
async def bad_3(data):
    return heavy_computation(data)  # event loop bloklandi
    
# ✓ To'g'ri
async def good_3(data):
    return await loop.run_in_executor(process_pool, heavy_computation, data)


# ✗ ANTI-PATTERN 4: fire-and-forget without tracking
async def bad_4():
    asyncio.create_task(send_email(...))  # reference saqlanmadi — GC yeyishi mumkin!
    
# ✓ To'g'ri
bg_tasks = set()
async def good_4():
    task = asyncio.create_task(send_email(...))
    bg_tasks.add(task)
    task.add_done_callback(bg_tasks.discard)
8.2

pydantic v2 & modern typing

Pydantic nima va u bilan qanday professional ishlash?
javob

Pydantic v2 — production patterns

models.py
from pydantic import BaseModel, Field, field_validator, model_validator, EmailStr
from pydantic import ConfigDict
from typing import Annotated, Literal
from decimal import Decimal
from datetime import datetime
from uuid import UUID

class OrderItem(BaseModel):
    model_config = ConfigDict(
        frozen=True,                   # immutable
        str_strip_whitespace=True,
        extra="forbid",                # noma'lum field → error
    )
    
    product_id: UUID
    quantity: Annotated[int, Field(gt=0, le=1000)]
    unit_price: Annotated[Decimal, Field(gt=0, decimal_places=2)]

class OrderCreate(BaseModel):
    model_config = ConfigDict(str_strip_whitespace=True, extra="forbid")
    
    user_id: UUID
    items: Annotated[list[OrderItem], Field(min_length=1, max_length=100)]
    shipping_address: str
    currency: Literal["USD", "EUR", "GBP", "UZS"] = "USD"
    notes: str | None = Field(default=None, max_length=500)
    
    @field_validator("shipping_address")
    @classmethod
    def validate_address(cls, v: str) -> str:
        if len(v.split(",")) < 3:
            raise ValueError("Format: street, city, country")
        return v
    
    @model_validator(mode="after")
    def validate_total(self) -> "OrderCreate":
        total = sum(i.quantity * i.unit_price for i in self.items)
        if total > Decimal("100000"):
            raise ValueError("Order exceeds 100k — manual approval needed")
        return self

# Environment config — pydantic-settings
from pydantic_settings import BaseSettings, SettingsConfigDict

class Settings(BaseSettings):
    model_config = SettingsConfigDict(env_file=".env", env_prefix="APP_")
    
    database_url: str
    redis_url: str = "redis://localhost:6379"
    openai_api_key: str
    secret_key: str = Field(min_length=32)
    debug: bool = False
    allowed_origins: list[str] = []
    
    @field_validator("database_url")
    @classmethod
    def validate_db(cls, v: str) -> str:
        if not v.startswith(("postgresql://", "postgresql+asyncpg://")):
            raise ValueError("Must be PostgreSQL URL")
        return v

settings = Settings()  # avtomatik .env'dan o'qiydi

Type hints — zamonaviy Python (3.12+)

typing_modern.py
from typing import TypeVar, ParamSpec, Protocol, Self
from collections.abc import Awaitable, Callable

T = TypeVar("T")
P = ParamSpec("P")

# Generic class
class Repository[T]:
    async def find(self, id: UUID) -> T | None: ...
    async def save(self, entity: T) -> T: ...

# Protocol — structural typing (duck typing with types)
class Cacheable(Protocol):
    def cache_key(self) -> str: ...
    def ttl(self) -> int: ...

async def cache_item(item: Cacheable) -> None:
    key = item.cache_key()
    ttl = item.ttl()
    await redis.setex(key, ttl, item.model_dump_json())

# Self type (3.11+)
class QueryBuilder:
    def where(self, **kwargs) -> Self:
        self._filters.update(kwargs)
        return self  # typed as Self, so chaining works
    
    def order_by(self, field: str) -> Self:
        self._order = field
        return self

# Decorator with ParamSpec
def with_retry[T, **P](max_attempts: int = 3):
    def decorator(fn: Callable[P, Awaitable[T]]) -> Callable[P, Awaitable[T]]:
        async def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
            for attempt in range(max_attempts):
                try:
                    return await fn(*args, **kwargs)
                except Exception:
                    if attempt == max_attempts - 1:
                        raise
                    await asyncio.sleep(2 ** attempt)
        return wrapper
    return decorator
8.3

fastapi — zero to production mastery

FastAPI'ni chuqur o'rganib, production-ready tizim qurish — poydevordan to professional darajagacha.
javob
FastAPI — bu nima va u qanday ishlaydi?

FastAPI — Python'da web API yaratish uchun zamonaviy framework. Uning "sehri" uch asosiy ustunga tayanadi:

1. Starlette (ASGI) — async HTTP server framework. Django WSGI'dan farqli: bir vaqtda minglab connection'larni bitta thread'da boshqaradi (event loop). WebSocket, SSE, background task — hammasi native.

2. Pydantic — Python type hint'larini runtime validation'ga aylantiradi. Siz user_id: int deb yozasiz — FastAPI avtomatik: (a) request'dan oladi, (b) int'ga convert qiladi, (c) noto'g'ri bo'lsa 422 qaytaradi, (d) OpenAPI schema'ga qo'shadi. Bu — boshqa framework'larda qo'lda yozilishi kerak bo'lgan yuzlab satr kod.

3. Dependency Injection — har endpoint uchun kerakli narsalarni (DB session, authenticated user, config) "so'rab olish" mexanizmi. Test yozish oson — fake dependency inject qilasiz, haqiqiy DB kerak emas.

Nima uchun Django/Flask o'rniga FastAPI?

Django: full-featured (ORM, admin, auth built-in), lekin sync va og'ir. AI backend uchun juda ko'p "qadoq" ortiqcha.

Flask: yengil, lekin har narsani qo'lda qilish kerak — validation, serialization, async handling, OpenAPI docs. 2015-yilda yaxshi edi, hozir eskirgan.

FastAPI: Flask'ning oddiyligi + Django'ning ishonchliligi + 2020-yillar zamonaviy Python (type hints, async). Benchmark'larda Flask'dan 3-5x tez. OpenAPI docs avtomatik. LLM integratsiyasi uchun eng yaxshi tanlov.

Loyiha strukturasi — production standard

Yaxshi loyiha strukturasi kelajakda juda ko'p muammoni oldini oladi. Quyidagi struktura katta production loyihalarda sinovdan o'tgan — Netflix, Instagram, Beeline backend'larida shunga o'xshash yondashuv ishlatiladi. Har qatlam aniq javobgarlikka ega:

📁 myapp/ project root 📁 app/ main application code ├─ main.py FastAPI() instance, lifespan, middleware ├─ config.py Pydantic Settings — env variables ├─ database.py Async engine, session factory, pool ├─ dependencies.py Shared DI: get_db, get_user, get_tenant 📂 api/ HTTP layer — routes & request parsing ├─ v1/ orders.py · users.py · products.py · auth.py ├─ health.py liveness + readiness probes 📂 services/ Business logic — DB'dan ajratilgan ├─ order_service.py · payment_service.py · email_service.py 📂 models/ Data shapes ├─ domain.py (SQLAlchemy ORM) ├─ schemas.py (Pydantic I/O) ├─ 📂 middleware/ auth, logging, CORS, request ID 📁 tests/ ├─ conftest.py pytest fixtures ├─ unit/ service logic tests ├─ integration/ API + DB tests 📁 alembic/ ├─ versions/ DB migrations ├─ env.py alembic config Version-controlled schema ⚙️ Project config • pyproject.toml uv/poetry config • .env environment secrets • Dockerfile production container • docker-compose.yml local dev stack 🔧 Dev tools • ruff linter + formatter (10x black) • mypy type checker • pytest-asyncio async test runner • uvicorn ASGI production server Request flow: api/ services/ models/ → DB · har qatlam o'ziga xos javobgarlikka ega (separation of concerns)

Modular arxitektura — har qatlam nima qiladi?

Yaxshi arxitektura "har narsa o'z joyida" tamoyiliga asoslanadi. Kodingizning har bir qismi aniq javobgarlikka ega bo'lishi, boshqa qismga ortiqcha bog'liq bo'lmasligi kerak. Bu Clean Architecture va Hexagonal Architecture tamoyillari hosilasi.

HTTP REQUEST POST /api/v1/orders ① API LAYER — api/v1/orders.py HTTP protokol, request parsing, response formatting Javobgarligi: Pydantic validation · Depends() · status code · OpenAPI ② SERVICE LAYER — services/order_service.py Biznes logika — asosiy qoidalar va ishlash tartibi Javobgarligi: validation · transaction · multiple repo calls · events ③ REPOSITORY LAYER — repositories/order_repo.py Ma'lumotlar bilan ishlash — DB queries, caching Javobgarligi: SQL · transactions · migrations · query optimization 🗄 PostgreSQL Primary data store SQLAlchemy async ⚡ Redis Cache · sessions Rate limiter 📨 Kafka / RabbitMQ Event bus · async Background tasks calls calls response TOP-DOWN DEPENDENCY (yuqori qatlam pastni biladi) Asosiy qoida: API layer DB'ga TO'G'RIDAN-TO'G'RI kirmaydi — Service orqali o'tadi
Nima uchun layerlash muhim?

1-misol: "Order yaratilganda email yuborish" qoidasini o'zgartirish kerak. Layerlanmagan kodda: orders.py, payments.py, api/admin.py — barcha joyda email yuborish kodini topib o'zgartirish kerak. Layerlangan kodda: faqat services/order_service.py — bitta joy, bitta o'zgarish.

2-misol: PostgreSQL'dan MongoDB'ga migratsiya kerak. Layerlanmagan: butun kodni qayta yozish. Layerlangan: faqat repositories/ qatlamini yangidan yozasiz — service va API qatlamlari tegmaydi.

3-misol: Test yozish. Layerlanmagan: har test uchun real DB, real email server ulash kerak. Layerlangan: fake repository inject qilasiz — bir soniyada 1000 ta test.

FastAPI request-to-response — to'liq hayot sikli

Tasavvur qiling: foydalanuvchi POST /api/v1/orders so'rovini yubordi. Bitta "klik" ortida 14 ta alohida qadam bajariladi — har biri o'z ishi bor. Quyidagi diagrammada har bir komponent nima qilishi va qayerda nima vaqt ketishi ko'rsatilgan:

📱 CLIENT POST /api/v1/orders + JSON body ① Uvicorn (ASGI server) HTTP parsing → ASGI scope + receive/send channels · ~0.5ms ② MIDDLEWARE CHAIN (onion model — so'rov tepadan pastga, javob pastdan tepaga) 2a. CORS check · Origin header verify · preflight OPTIONS 2b. Request ID + Logger · X-Request-ID generate → structured log 2c. Rate limiter · Token bucket check (Redis) · 429 if exceed ③ Router matching POST /api/v1/orders → create_order() funksiyasi · path params extraction ④ DEPENDENCY INJECTION (Depends() graph resolution) 4a. get_db() · connection pool'dan session olish · yield 4b. get_current_user() · Bearer token → JWT decode → DB user lookup 4c. get_tenant() · user'dan tenant_id olish → tenant context ⑤ Pydantic body validation OrderCreate(product_id=UUID, quantity=int>0) · 422 if invalid ⑥ HANDLER — create_order() — siz yozgan kod ishga tushadi 6a. Service call order_service.create(data, user, tenant) 6b. Business logic validate inventory · check credit · apply discount 6c. Repository call repo.save(order) → PostgreSQL INSERT (5-15ms) 6d. Background task send_email.delay(order_id) · notify_warehouse() ⑦ Response serialization OrderResponse (Pydantic) → JSON · secret fields exclude ⑧ Middleware reverse · Response headers qo'shish (X-Request-ID, X-Response-Time) · logging · CORS HTTP 201 + JSON
Onion model — middleware qanday ishlaydi?

Middleware'lar piyoz qatlamlari kabi ishlaydi. So'rov tashqi qatlamdan kirib, ichki yadroga yetadi — har qatlam kirishda bir narsa qiladi (masalan, auth verify). Javob qaytishda esa har qatlam chiqishda boshqa narsa qiladi (masalan, response header qo'shish). Shuning uchun CORS oxirgi javobga ham ta'sir qiladi, birinchi request'ga ham.

Middleware tartibi muhim: CORS → RequestID → RateLimit → Auth → Handler. Agar RateLimit oldin bo'lsa — authsiz odamlar ham hisobga olinadi. Agar Auth oldin bo'lsa — yuzta bot ham DB'ga murojaat qiladi keyin cheklanadi.

Kod misoli — to'liq modular API

Endi yuqoridagi arxitekturani amalda ko'ramiz. "Order yaratish" endpoint'ini 4 qatlamda yozamiz — kichik qismlarga bo'lib, har biri nima qilishini aniq tushunish uchun.

1. Application entry point — app/main.py

Bu — ilovaning "kirish eshigi". FastAPI instance yaratiladi, resurslar (DB, Redis, Kafka) ishga tushirilib, middleware'lar ulanadi. Avval eng muhim qismi — lifespan funksiyasi:

app/main.py — lifespan (resurs boshqaruvi)
from contextlib import asynccontextmanager
from fastapi import FastAPI
from app.config import settings
from app.database import create_db_pool

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup: resurslarni bir marta yaratamiz
    app.state.db_pool = await create_db_pool(settings.DATABASE_URL)
    app.state.redis = await create_redis_pool(settings.REDIS_URL)
    app.state.kafka = await create_kafka_producer(settings.KAFKA_URL)

    yield  # ← Shu yerda ilova ishlaydi

    # Shutdown: resurslarni toza yopamiz
    await app.state.kafka.stop()
    await app.state.redis.close()
    await app.state.db_pool.close()

Nima qilyapmiz? lifespan — maxsus async context manager. yield belgisi gacha bo'lgan kod app ishga tushganda bir marta ishlaydi, yield'dan keyingi kod — app to'xtaganda (graceful shutdown). Bu muhim, chunki connection pool'larni bir marta yaratib, har request'da qayta yaratmaymiz. app.state — FastAPI'ning global saqlash joyi, ichidagi har narsaga middleware va endpoint'lardan kirish mumkin.

Keyin FastAPI obyektini yaratamiz va middleware'larni qo'shamiz. Tartib muhim:

app/main.py — middleware stack
from fastapi.middleware.cors import CORSMiddleware
from app.middleware import RequestIDMiddleware, LoggingMiddleware
from app.api.v1.router import api_router

app = FastAPI(
    title=settings.APP_NAME,
    version=settings.VERSION,
    lifespan=lifespan,
    docs_url="/docs" if settings.DEBUG else None,
)

# Tartib: oxirgi qo'shilgan — birinchi ishga tushadi (onion model)
app.add_middleware(RequestIDMiddleware)    # eng ichki
app.add_middleware(LoggingMiddleware)
app.add_middleware(CORSMiddleware,          # eng tashqi
    allow_origins=settings.ALLOWED_ORIGINS,
    allow_credentials=True,
    allow_methods=["*"],
)

app.include_router(api_router, prefix="/api/v1")

Piyoz (onion) tamoyili: FastAPI middleware'larni teskari tartibda qo'llaydi. Birinchi kirgan — oxirgi ishga tushadi. CORS eng oxirida qo'shilgani uchun — birinchi bo'lib ishga tushadi (yaxshi, chunki CORS header'sini so'rov kirishidanoq tekshirish kerak). Request ID eng oxirgi ichki qatlam — har so'rov unga unique ID beradi va keyin log'larda shu ID bo'yicha kuzatish mumkin.

2. API qatlami — app/api/v1/orders.py

Bu qatlam faqat HTTP bilan gaplashadi. Biznes logika bu yerda yo'q. Handler maksimal yupqa bo'lishi kerak — faqat request'dan data'ni olish, service'ni chaqirish, response qaytarish:

app/api/v1/orders.py — create endpoint
from fastapi import APIRouter, Depends, status
from app.models.schemas import OrderCreate, OrderResponse
from app.services.order_service import OrderService
from app.dependencies import get_order_service, get_current_user
from app.models.domain import User

router = APIRouter(prefix="/orders", tags=["orders"])

@router.post("", response_model=OrderResponse, status_code=201)
async def create_order(
    data: OrderCreate,
    current_user: User = Depends(get_current_user),
    service: OrderService = Depends(get_order_service),
) -> OrderResponse:
    order = await service.create_order(data=data, user=current_user)
    return OrderResponse.from_domain(order)

Diqqat qiling: handler faqat 2 qatorlik ish qiladi — service'ni chaqiradi va response qaytaradi. Qolgan hammasini FastAPI o'zi qiladi: data: OrderCreate — request body'ni Pydantic bilan avtomatik validate qiladi; Depends(get_current_user) — JWT token'ni tekshirib user'ni DB'dan topib beradi; response_model=OrderResponse — javobni Pydantic schema bo'yicha serialize qiladi. Siz yozmagan, lekin hammasi ishlaydi.

Ro'yxat endpointi biroz murakkabroq — pagination kerak:

app/api/v1/orders.py — list endpoint
@router.get("", response_model=OrderListResponse)
async def list_orders(
    pagination: PaginationParams = Depends(get_pagination),
    current_user: User = Depends(get_current_user),
    service: OrderService = Depends(get_order_service),
) -> OrderListResponse:
    orders, total = await service.list_user_orders(
        user_id=current_user.id,
        limit=pagination.limit,
        offset=pagination.offset,
    )
    return OrderListResponse(
        data=[OrderResponse.from_domain(o) for o in orders],
        total=total,
        page=pagination.page,
    )

Pagination ham Depends orqali keladi — ?page=2&per_page=20 query parametrlarni olib, validate qilib, tayyor obyekt beradi. Bu pattern har endpoint'da qayta ishlatiladi — bir marta yozib, hamma joyda kerak bo'lganda Depends(get_pagination).

3. Service qatlami — app/services/order_service.py

Bu qatlam — ilovaning yuragi. Butun biznes logika shu yerda jamlanadi. HTTP haqida hech narsa bilmaydi — agar ertaga API'dan voz kechib CLI yoki GraphQL ga o'tsak, service tegmaydi. Avval konstruktor:

order_service.py — dependency injection
from app.repositories.order_repo import OrderRepository
from app.repositories.product_repo import ProductRepository
from app.repositories.wallet_repo import WalletRepository
from app.services.event_bus import EventBus
from app.models.domain import Order, User, OrderStatus
from app.models.schemas import OrderCreate
from app.exceptions import InsufficientStockError, InsufficientFundsError

class OrderService:
    def __init__(
        self,
        order_repo: OrderRepository,
        product_repo: ProductRepository,
        wallet_repo: WalletRepository,
        event_bus: EventBus,
    ):
        self.orders = order_repo
        self.products = product_repo
        self.wallets = wallet_repo
        self.events = event_bus

Nima uchun shunday? Service hech narsani o'zi yaratmaydi — barcha bog'liqliklar tashqaridan beriladi (Dependency Injection). Bu testing uchun muhim: test vaqtida haqiqiy DB o'rniga FakeOrderRepository inject qilish mumkin. Bundan tashqari, agar ertaga PostgreSQL'dan MongoDB'ga o'tsak, faqat repository'ni almashtirish kifoya — service qonida o'zgarish bo'lmaydi.

Keyin asosiy metod — buyurtma yaratish. Qadamlarga bo'lib ko'ramiz. Birinchi, mahsulotni tekshirish:

order_service.py — 1. validation
async def create_order(self, data: OrderCreate, user: User) -> Order:
    # 1. Mahsulot bor-yo'qligini tekshirish
    product = await self.products.get_by_id(data.product_id)
    if not product:
        raise NotFoundError(f"Product {data.product_id} not found")

    # 2. Omborda yetarli ekanligini tekshirish
    if product.stock < data.quantity:
        raise InsufficientStockError(
            f"Only {product.stock} available, requested {data.quantity}"
        )

Birinchi qadam — ma'lumotlar to'g'riligini tekshirish. Mahsulot bormi? Ombor yetarlimi? Agar biror tekshiruvdan o'tmasa — domain exception otiladi. Bu exception'lar keyinchalik API qatlamida 404 yoki 409 response'ga aylantiriladi, lekin service o'zi HTTP status code haqida hech narsa bilmaydi.

Ikkinchi qadam — biznes qoidalarini qo'llash (narx va chegirma):

order_service.py — 2. pricing
    # 3. Narx va chegirma hisoblash
    total = product.price * data.quantity
    if user.is_premium:
        total *= Decimal("0.9")   # Premium foydalanuvchilarga 10% chegirma

    # 4. Hamyondagi pul yetarli ekanligini tekshirish
    wallet = await self.wallets.get_by_user(user.id)
    if wallet.balance < total:
        raise InsufficientFundsError(
            f"Balance: {wallet.balance}, required: {total}"
        )

Diqqat qiling: chegirma qoidasi (premium users get 10% off) faqat shu yerda. API handlerda yo'q, controllerda yo'q, repository'da yo'q. Agar ertaga chegirma 15% bo'lsa yoki yangi tier qo'shilsa — faqat shu joyni o'zgartiramiz. Bu — single source of truth tamoyili.

Uchinchi qadam — saqlash, lekin eng muhim jihati bilan: transaction ichida. Bu yerda atomicity muhim — yoki hammasi saqlanadi, yoki hech biri:

order_service.py — 3. atomic transaction
    # 5. Transaction ichida saqlash (ATOMIC!)
    async with self.orders.transaction():
        order = await self.orders.create(
            user_id=user.id,
            product_id=data.product_id,
            quantity=data.quantity,
            total=total,
            status=OrderStatus.PENDING,
        )
        await self.products.decrement_stock(data.product_id, data.quantity)
        await self.wallets.debit(user.id, total)

Nima uchun transaction muhim? Tasavvur qiling: buyurtma yaratildi, stok kamaytirildi, lekin hamyondan pul olish vaqtida xato chiqdi. Transactionsiz: buyurtma bor, stok kamaygan, lekin pul o'tmagan — mijoz tekin mahsulot oladi. Transaction bilan: yoki uch amal ham muvaffaqiyatli bajariladi, yoki bironi ham bajarilmaydi. Bu — ACID'ning Atomicity xossasi.

Oxirgi qadam — event chiqarish:

order_service.py — 4. publish event
    # 6. Async events — response foydalanuvchiga ketgandan keyin ishlaydi
    await self.events.publish("order.created", {
        "order_id": str(order.id),
        "user_id": str(user.id),
        "total": float(total),
    })

    return order

Nega event kerak? Buyurtma yaratilishi natijasida ko'p narsa bo'lishi kerak: mijozga email, omborxonaga notifikatsiya, ML model'ga data, analytics'ga metric. Agar hammasini shu metod ichida sinxron qilsak — 5 sekund kutadi foydalanuvchi. Event bilan: service faqat "buyurtma yaratildi" deb e'lon qiladi, qolganlar o'z vaqtida, o'z tezligida bajaradi. Bu — event-driven architecture.

4. Repository qatlami — app/repositories/order_repo.py

Bu qatlamning yagona vazifasi — DB bilan ishlash. Hech qanday biznes logika yo'q. Faqat SQL query, transaction, caching. Agar service "buyurtmani saqla" desa — repository aynan saqlaydi, nima uchunligini so'ramaydi. Avval sinf:

order_repo.py — struktura
from contextlib import asynccontextmanager
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, update, func
from app.models.domain import Order, OrderStatus

class OrderRepository:
    def __init__(self, session: AsyncSession):
        self.session = session

    @asynccontextmanager
    async def transaction(self):
        """Atomic operation uchun transaction wrapper"""
        async with self.session.begin():
            yield

Repository SQLAlchemy session ni oladi. Session — bu bitta request doirasidagi DB context. U orqali SELECT, INSERT, UPDATE qilinadi. transaction() metodi — context manager. Uni service'ning async with bloki ichida ishlatganda, ichidagi barcha amallar bir transaction'da bajariladi.

Yangi buyurtma saqlash metodi:

order_repo.py — create
    async def create(
        self,
        user_id: UUID,
        product_id: UUID,
        quantity: int,
        total: Decimal,
        status: OrderStatus,
    ) -> Order:
        order = Order(
            user_id=user_id,
            product_id=product_id,
            quantity=quantity,
            total=total,
            status=status,
        )
        self.session.add(order)
        await self.session.flush()    # INSERT execute, lekin commit emas
        await self.session.refresh(order)   # DB'dan id, created_at ni olish
        return order

flush() va commit() farqini bilish muhim: flush — SQL'ni DB'ga yuboradi, lekin transactionni yopmaydi (id, timestamp default qiymatlari tayyor bo'ladi, lekin rollback qilish hali mumkin). Commit — transactionni yakunlaydi. Biz service'dagi async with transaction() bloki tugaganda — o'shanda avtomatik commit bo'ladi. Bu yondashuv: birinchi flush, tekshiruv, keyin commit — xavfsizroq.

Ro'yxat olish metodi — ikki query'ni parallel bajarish bilan:

order_repo.py — list with pagination
    async def list_by_user(
        self, user_id: UUID, limit: int = 20, offset: int = 0,
    ) -> tuple[list[Order], int]:
        # Data query
        data_query = (
            select(Order)
            .where(Order.user_id == user_id)
            .order_by(Order.created_at.desc())
            .limit(limit).offset(offset)
        )
        # Count query (pagination uchun total kerak)
        count_query = (
            select(func.count(Order.id))
            .where(Order.user_id == user_id)
        )

        # PARALLEL execute — ikkala query bir vaqtda
        results = await asyncio.gather(
            self.session.execute(data_query),
            self.session.execute(count_query),
        )

        orders = list(results[0].scalars().all())
        total = results[1].scalar_one()
        return orders, total

Nima uchun parallel? Ikki alohida query: birinchisi 20 ta buyurtma oladi, ikkinchisi umumiy sonni. Agar ularni ketma-ket qilsak — 2x vaqt ketadi. asyncio.gather ikkalasini bir vaqtda yuboradi — DB ikkisini parallel bajaradi. Bu — async Python'ning asosiy kuchi. Sync kod bilan bu optimizatsiyani qilish mumkin emas edi.

5. Dependency graph — app/dependencies.py

FastAPI'ning eng kuchli xususiyati — avtomatik dependency resolution. Siz shunchaki "menga OrderService kerak" deysiz, FastAPI o'zi: avval sessionni yaratadi, undan OrderRepository, undan va ProductRepository, WalletRepository, EventBus'dan OrderService'ni qurib chiqaradi. Graph'ni avtomatik bilasu.

Birinchi — eng asosiy dependency, DB session:

dependencies.py — DB session
from fastapi import Depends, Request
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import SessionLocal

async def get_db() -> AsyncSession:
    async with SessionLocal() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise

Bu generator funksiyasi (yield ishlatilgan, return emas). yield sessiongacha bo'lgan qism request boshlanganda ishlaydi — session yaratiladi. yielddan keyingi qism request tugagach ishlaydi — agar xato bo'lmagan bo'lsa commit, xato bo'lsa rollback. Bu pattern har endpoint uchun o'z-o'zidan transaction management beradi.

Foydalanuvchini JWT'dan olish:

dependencies.py — authenticated user
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi import HTTPException
import jwt
from app.config import settings

security = HTTPBearer()

async def get_current_user(
    credentials: HTTPAuthorizationCredentials = Depends(security),
    db: AsyncSession = Depends(get_db),
) -> User:
    try:
        payload = jwt.decode(
            credentials.credentials,
            settings.JWT_SECRET,
            algorithms=[settings.JWT_ALGORITHM],
        )
    except jwt.ExpiredSignatureError:
        raise HTTPException(401, "Token expired")
    except jwt.InvalidTokenError:
        raise HTTPException(401, "Invalid token")

    user = await db.get(User, UUID(payload["sub"]))
    if not user:
        raise HTTPException(401, "User not found")
    return user

Bu funksiya ikkita narsaga bog'liq: HTTPBearer (Authorization header'dan token'ni oladi) va get_db (DB'dan user'ni topish uchun). FastAPI buni avtomatik bilsin deb yozildi — qachon endpoint Depends(get_current_user) so'rasa, FastAPI avval DB session yaratadi, keyin token'ni o'qiydi, validate qiladi va user'ni DB'dan topib beradi.

Service'larning o'zi — bir necha repository'dan yasaladi:

dependencies.py — service wiring
async def get_order_repo(db: AsyncSession = Depends(get_db)) -> OrderRepository:
    return OrderRepository(db)

async def get_product_repo(db: AsyncSession = Depends(get_db)) -> ProductRepository:
    return ProductRepository(db)

async def get_wallet_repo(db: AsyncSession = Depends(get_db)) -> WalletRepository:
    return WalletRepository(db)

async def get_order_service(
    order_repo: OrderRepository = Depends(get_order_repo),
    product_repo: ProductRepository = Depends(get_product_repo),
    wallet_repo: WalletRepository = Depends(get_wallet_repo),
    event_bus: EventBus = Depends(lambda r: r.app.state.event_bus),
) -> OrderService:
    return OrderService(
        order_repo=order_repo,
        product_repo=product_repo,
        wallet_repo=wallet_repo,
        event_bus=event_bus,
    )

Diqqat: uchchala repository bir xil session'ni olishadi. FastAPI bilan: bir request uchun get_db() bir marta chaqiriladi, natija cache'lanadi va har joyda bir xil session ishlatiladi. Bu muhim — transaction bitta connection'da bo'lishi kerak. Agar har repository alohida session olsa — transaction ishlamaydi.

6. Pydantic schemas — app/models/schemas.py

Schema'lar — API kontrakti. Ular ikkiga bo'linadi: input (client → server) va output (server → client). DB model'dan alohida saqlash — qattiq qoida:

schemas.py — Input
from pydantic import BaseModel, Field
from uuid import UUID

class OrderCreate(BaseModel):
    """Client'dan kelayotgan data — faqat kerakli fieldlar"""
    product_id: UUID
    quantity: int = Field(..., ge=1, le=100, description="1-100 oralig'ida")

E'tibor bering: OrderCreate'da user_id yo'q. Xavfsizlik uchun: user o'z ID'sini yubora olmaydi — biz uni JWT'dan olamiz. Agar user_id field'da bo'lsa, hacker boshqa user nomidan buyurtma yaratishi mumkin. total ham yo'q — uni server o'zi hisoblaydi (premium chegirma va h.k.). Client faqat o'zi bilishi kerak bo'lgan narsalarni yuboradi.

schemas.py — Output
from pydantic import ConfigDict
from datetime import datetime
from decimal import Decimal
from typing import Self
from app.models.domain import Order, OrderStatus

class OrderResponse(BaseModel):
    """Client'ga qaytariladigan data"""
    model_config = ConfigDict(from_attributes=True)

    id: UUID
    product_id: UUID
    quantity: int
    total: Decimal
    status: OrderStatus
    created_at: datetime
    # user_id qaytarilmaydi — user o'ziniki ekanini biladi
    # internal_notes qaytarilmaydi — bu admin uchun

    @classmethod
    def from_domain(cls, order: Order) -> Self:
        return cls.model_validate(order)

from_attributes=True — Pydantic'ga aytadi: "ORM obyekt'idan atributlarni o'zingiz oling". Ya'ni Order ORM obyektidan OrderResponse'ni model_validate bilan yaratish mumkin. DB'da password_hash, internal_notes field'lar bo'lsa ham — ular bu yerda yo'q, shuning uchun javobga tushmaydi. Bu — eng muhim xavfsizlik tamoyili: qachon siz foydalanuvchiga data qaytarsangiz, har field'ni alohida e'tirof etgan bo'ling, aks holda yashirin ma'lumotlar tasodifan chiqib ketishi mumkin.

7. Testing — unit test mock bilan

Modular arxitekturaning eng katta mukofoti — tez va ishonchli testing. Service qatlamini real DB'siz, minglab marta sekundida test qilamiz. Avval fixture:

test_order_service.py — setup
import pytest
from decimal import Decimal
from unittest.mock import AsyncMock
from app.services.order_service import OrderService
from app.models.schemas import OrderCreate
from app.models.domain import Product, User, Wallet
from app.exceptions import InsufficientStockError

@pytest.fixture
def mock_dependencies():
    """Barcha dependencies — fake obyektlar"""
    return {
        "order_repo": AsyncMock(),
        "product_repo": AsyncMock(),
        "wallet_repo": AsyncMock(),
        "event_bus": AsyncMock(),
    }

@pytest.fixture
def service(mock_dependencies):
    return OrderService(**mock_dependencies)

AsyncMock — barcha metodlari async bo'lgan fake obyekt. Har metodni chaqirganda — uning nima qaytarishini biz belgilaymiz. Bu real DB o'rniga ishlaydi: bir millisekundda javob beradi, diskka hech narsa yozmaydi, testlar bir-biriga ta'sir qilmaydi.

Birinchi test — muvaffaqiyatli stsenariy:

test — happy path
async def test_success_with_enough_stock_and_balance(
    service, mock_dependencies
):
    # Arrange — fake ma'lumotlarni tayyorlash
    product = Product(id="p1", price=Decimal("100"), stock=10)
    wallet = Wallet(user_id="u1", balance=Decimal("500"))
    user = User(id="u1", is_premium=False)

    mock_dependencies["product_repo"].get_by_id.return_value = product
    mock_dependencies["wallet_repo"].get_by_user.return_value = wallet

    # Act — tekshiriladigan amalni bajarish
    data = OrderCreate(product_id="p1", quantity=3)
    order = await service.create_order(data, user)

    # Assert — kutilgan natijani tekshirish
    assert order.quantity == 3
    mock_dependencies["product_repo"].decrement_stock.assert_called_with("p1", 3)
    mock_dependencies["wallet_repo"].debit.assert_called_with("u1", Decimal("300"))
    mock_dependencies["event_bus"].publish.assert_called_once()

AAA pattern (Arrange-Act-Assert) — test yozishning klassik yo'li. Arrange: fake obyektlar va ularning javoblarini tayyorlaymiz. Act: sinab ko'rmoqchi bo'lgan metodni chaqiramiz. Assert: natija va qo'shimcha qilingan chaqiruvlarni tekshiramiz. assert_called_with — "bu metod aynan shu parametrlar bilan chaqirilganmi?" — integratsiyani tekshiradi.

Ikkinchi test — exception scenariy:

test — stock yetmaganda
async def test_raises_if_stock_insufficient(service, mock_dependencies):
    mock_dependencies["product_repo"].get_by_id.return_value = Product(
        id="p1", price=Decimal("100"), stock=2   # faqat 2 dona bor
    )

    # Quantity 5 — stock'dan ko'p
    with pytest.raises(InsufficientStockError):
        await service.create_order(
            OrderCreate(product_id="p1", quantity=5),
            User(id="u1", is_premium=False),
        )

Bu test bizning qoidamizni himoya qiladi: "stokdan ko'p buyurtma qabul qilinmaydi". Agar kelajakda kimdir qoidani noto'g'ri o'zgartirib yuborsa — test chaqiradi va commit qilishga yo'l qo'ymaydi.

Uchinchi test — premium discount qoidasini sinab ko'ramiz:

test — premium chegirma
async def test_premium_user_gets_10_percent_discount(
    service, mock_dependencies
):
    mock_dependencies["product_repo"].get_by_id.return_value = Product(
        id="p1", price=Decimal("100"), stock=10
    )
    mock_dependencies["wallet_repo"].get_by_user.return_value = Wallet(
        user_id="u1", balance=Decimal("1000")
    )
    user = User(id="u1", is_premium=True)   # ← PREMIUM user

    await service.create_order(OrderCreate(product_id="p1", quantity=2), user)

    # 100 * 2 * 0.9 = 180 (not 200)
    mock_dependencies["wallet_repo"].debit.assert_called_with(
        "u1", Decimal("180.0")
    )

Bitta test bitta xulosa tamoyili: har test faqat bir biznes qoidani tekshiradi. "Premium user 10% chegirma oladi" — alohida test. "Stok yetmasa exception" — alohida test. Shunday qilib, test fail bo'lsa — aniq qaysi qoida buzilganini darhol bilasiz.

FastAPI mastery — asosiy tamoyillar

1. Handler yupqa bo'lsin. API layer'da biznes logika yo'q — faqat service chaqirish. Agar handler 20 qatordan oshsa — service'ga ko'chirish kerak.

2. Dependency injection keng ishlatilsin. Har resurs (DB, cache, service) Depends() orqali keladi — testing uchun fake qo'yish oson.

3. Pydantic schema = API kontrakt. DB model'dan alohida. Input va output schema'lar alohida bo'lishi ham mumkin.

4. Exception'lar domain'da tug'iladi. Service InsufficientStockError raise qiladi — API layer'da global exception handler bu'ni 409 response'ga aylantiradi.

5. Test unit + integration. Unit testlar service'ni fake repo bilan (tez), integration testlar real DB bilan (sekin, lekin real).

09

devops & cloud — docker, kubernetes, ci/cd

9.1

docker — multi-stage, security, optimization

Production-grade Dockerfile qanday yoziladi?
javob
Dockerfile
# syntax=docker/dockerfile:1.7

# Stage 1: builder
FROM python:3.12-slim AS builder

WORKDIR /build

# System deps faqat build uchun
RUN apt-get update && apt-get install -y --no-install-recommends \
    build-essential \
    && rm -rf /var/lib/apt/lists/*

# uv — yangi tez package manager (2026 default)
COPY --from=ghcr.io/astral-sh/uv:latest /uv /uv

# Dependencies — alohida layer (cache optimization)
COPY pyproject.toml uv.lock ./
RUN --mount=type=cache,target=/root/.cache/uv \
    uv sync --frozen --no-dev --no-install-project

# Stage 2: runtime
FROM python:3.12-slim AS runtime

# Non-root user
RUN groupadd -r app && useradd -r -g app -d /app -s /bin/bash app

WORKDIR /app

# System deps (minimal)
RUN apt-get update && apt-get install -y --no-install-recommends \
    libpq5 curl \
    && rm -rf /var/lib/apt/lists/* \
    && apt-get clean

# Dependencies builder'dan ko'chiriladi
COPY --from=builder /build/.venv /app/.venv
ENV PATH="/app/.venv/bin:$PATH"
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1

# App code
COPY --chown=app:app . .

USER app
EXPOSE 8000

HEALTHCHECK --interval=30s --timeout=5s --start-period=30s --retries=3 \
    CMD curl -f http://localhost:8000/health || exit 1

CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "2"]

Docker optimization tips

  • Multi-stage build — runtime image ga faqat kerakli. 500 MB → 150 MB.
  • .dockerignore__pycache__/, .git/, tests/, .env
  • Non-root user — security baseline
  • Layer caching — dependencies copy kodidan oldin
  • BuildKit cache mount — tez qayta build
  • Distroless images — gcr.io/distroless/python3 (yana kichikroq, xavfsizroq)
9.2

kubernetes — production deployment

K8s'da ilovani qanday to'g'ri deploy qilish?
javob
Kubernetes nima va nima uchun kerak?

Kubernetes (K8s) — container'larni boshqarish tizimi. Tasavvur qiling: 50 server, 200 container, har biri har xil CPU/memory kerak, traffic birdaniga 10x oshdi, bitta server o'chib qoldi — bularni qo'lda boshqarish mumkin emas. K8s avtomatik bajaradi: container'larni serverlarga joylashtiradi (scheduling), o'lik container'larni qayta ishga tushiradi (self-healing), traffic oshganda yangi nusxa yaratadi (auto-scaling), traffic kamaysa o'chiradi (scale down).

Declarative model: siz "holatni" tasvirlaysiz (YAML'da), K8s shu holatga yetkazadi va saqlab turadi. "3 ta replica bo'lsin" → bitta o'chsa, K8s darhol yangi birini ishga tushiradi.

K8s resource'lar — hierarchy

Pod ← eng kichik birlik, 1+ container ↑ Deployment ← declarative pod management, rolling update ↑ Service ← network endpoint, load balance ↑ Ingress ← external HTTP routing + ConfigMap, Secret ← configuration + HPA ← auto scaling + PodDisruptionBudget ← availability during deployments + NetworkPolicy ← network isolation
deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: api
  labels: { app: api, tier: backend }
spec:
  replicas: 3
  strategy:
    type: RollingUpdate
    rollingUpdate:
      maxSurge: 1
      maxUnavailable: 0       # downtime yo'q
  selector:
    matchLabels: { app: api }
  template:
    metadata:
      labels: { app: api, tier: backend }
    spec:
      topologySpreadConstraints:
        - maxSkew: 1
          topologyKey: kubernetes.io/hostname
          whenUnsatisfiable: ScheduleAnyway
          labelSelector:
            matchLabels: { app: api }
      containers:
      - name: api
        image: registry.internal/api:v1.42.0
        ports: [{ containerPort: 8000 }]
        
        # Resource limits — MUST have
        resources:
          requests: { cpu: "250m", memory: "512Mi" }
          limits:   { cpu: "1000m", memory: "1Gi" }
        
        # Probes — 3 ta turi
        startupProbe:
          httpGet: { path: /health, port: 8000 }
          failureThreshold: 30
          periodSeconds: 2
        livenessProbe:
          httpGet: { path: /health, port: 8000 }
          periodSeconds: 15
          failureThreshold: 3
        readinessProbe:
          httpGet: { path: /ready, port: 8000 }
          periodSeconds: 5
          failureThreshold: 2
        
        env:
          - name: DATABASE_URL
            valueFrom: { secretKeyRef: { name: db, key: url } }
          - name: POD_NAME
            valueFrom: { fieldRef: { fieldPath: metadata.name } }
        
        lifecycle:
          preStop:
            exec:
              command: ["sh", "-c", "sleep 15"]   # graceful drain

---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata: { name: api }
spec:
  scaleTargetRef: { apiVersion: apps/v1, kind: Deployment, name: api }
  minReplicas: 3
  maxReplicas: 20
  metrics:
    - type: Resource
      resource: { name: cpu, target: { type: Utilization, averageUtilization: 70 } }
    - type: Pods
      pods:
        metric: { name: http_requests_per_second }
        target: { type: AverageValue, averageValue: "100" }
  behavior:
    scaleDown:
      stabilizationWindowSeconds: 300   # 5 daqiqa kutib tursin

Probes — hayotiy muhim

ProbeNima qiladiFail bo'lsa
startupProbePod ishga tushyaptimi?Container restart, lekin liveness ishga tushmaydi hali
livenessProbePod tirikmi?Container restart
readinessProbeTrafik qabul qila oladimi?Service endpoint'dan olib tashlanadi

Deployment strategies

Rolling (default) — balanced
  • No downtime
  • Resource efficient
  • Slow rollback
Blue-Green — instant rollback
  • Instant switch
  • 2x resource
  • Migration complexity

Canary — eng yaxshi. 5% → 25% → 50% → 100% traffic. Har step'da metrics'ni kuzatib. Flagger, Argo Rollouts bu ishni avtomatlashtiradi.

9.3

ci/cd — github actions production pipeline

Professional CI/CD pipeline qanday tuziladi?
javob
.github/workflows/deploy.yml
name: deploy
on:
  push: { branches: [main] }

jobs:
  test:
    runs-on: ubuntu-latest
    services:
      postgres:
        image: postgres:16-alpine
        env: { POSTGRES_PASSWORD: test }
        options: --health-cmd "pg_isready"
      redis:
        image: redis:7-alpine
    steps:
      - uses: actions/checkout@v4
      - uses: astral-sh/setup-uv@v3
      - run: uv sync --frozen
      - run: uv run ruff check .
      - run: uv run mypy app/
      - run: uv run pytest --cov=app --cov-fail-under=80
  
  security:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: aquasecurity/trivy-action@master
        with: { scan-type: 'fs', exit-code: '1', severity: 'CRITICAL,HIGH' }
      - uses: snyk/actions/python@master
        env: { SNYK_TOKEN: ${{ secrets.SNYK_TOKEN }} }
  
  build:
    needs: [test, security]
    runs-on: ubuntu-latest
    permissions: { contents: read, packages: write, id-token: write }
    steps:
      - uses: actions/checkout@v4
      - uses: docker/setup-buildx-action@v3
      - uses: docker/login-action@v3
        with:
          registry: ghcr.io
          username: ${{ github.actor }}
          password: ${{ secrets.GITHUB_TOKEN }}
      - uses: docker/metadata-action@v5
        id: meta
        with:
          images: ghcr.io/${{ github.repository }}
          tags: |
            type=sha,format=long
            type=ref,event=branch
      - uses: docker/build-push-action@v5
        with:
          push: true
          tags: ${{ steps.meta.outputs.tags }}
          cache-from: type=gha
          cache-to: type=gha,mode=max
          provenance: true       # SLSA provenance
          sbom: true             # Software Bill of Materials
  
  deploy:
    needs: build
    runs-on: ubuntu-latest
    environment: production
    steps:
      - uses: actions/checkout@v4
      - uses: azure/setup-kubectl@v4
      - run: |
          kubectl set image deployment/api \
            api=ghcr.io/${{ github.repository }}:sha-${{ github.sha }}
          kubectl rollout status deployment/api --timeout=5m
Production best practices

SBOM (Software Bill of Materials) — supply chain security
Provenance attestation — SLSA Level 3
OIDC federation — long-lived secret'larsiz cloud'ga deploy
Progressive delivery — Argo Rollouts/Flagger bilan canary

10

observability & reliability

10.1

3 pillars — logs, metrics, traces

Production tizimda nima sodir bo'layotganini qanday bilish?
javob
3 pillar'ning farqi

Metrics — "nimadir buzuldi" (RPS, latency, error rate).
Traces — "qayerda buzuldi" (request yo'li, qaysi service sekinlashdi).
Logs — "nima uchun buzuldi" (aniq detail, stack trace, input).

Structured logging

logging.py
import structlog
import logging

structlog.configure(
    processors=[
        structlog.processors.add_log_level,
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.StackInfoRenderer(),
        structlog.processors.dict_tracebacks,
        structlog.processors.JSONRenderer(),  # JSON output
    ],
    wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
    cache_logger_on_first_use=True,
)

logger = structlog.get_logger()

# Ishlatish — ALWAYS with context
logger.info(
    "order.created",
    order_id=str(order.id),
    tenant_id=str(order.tenant_id),
    user_id=str(order.user_id),
    amount=float(order.total),
    currency=order.currency,
    trace_id=get_current_trace_id(),
)

# Output (JSON):
# {"event": "order.created", "order_id": "...", "tenant_id": "...",
#  "timestamp": "2026-04-21T10:23:45Z", "level": "info", ...}

Prometheus metrics

metrics.py
from prometheus_client import Counter, Histogram, Gauge, Info

# RED method: Rate, Errors, Duration
http_requests = Counter(
    'http_requests_total',
    'Total HTTP requests',
    ['method', 'endpoint', 'status', 'tenant']
)

http_duration = Histogram(
    'http_request_duration_seconds',
    'HTTP request duration',
    ['method', 'endpoint'],
    buckets=[0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]
)

# USE method: Utilization, Saturation, Errors
active_connections = Gauge(
    'db_connections_active',
    'Active DB connections',
    ['pool']
)

queue_depth = Gauge('celery_queue_depth', 'Tasks in queue', ['queue'])

# Business metrics
orders_created = Counter(
    'orders_created_total',
    'Orders created',
    ['tenant', 'plan', 'currency']
)

app_info = Info('app_version', 'Application version')
app_info.info({'version': '1.42.0', 'build': 'a1b2c3'})

Distributed tracing — OpenTelemetry

tracing.py
from opentelemetry import trace
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor

# Setup
provider = TracerProvider()
provider.add_span_processor(BatchSpanProcessor(
    OTLPSpanExporter(endpoint="http://otel-collector:4317")
))
trace.set_tracer_provider(provider)

# Auto-instrument frameworks
FastAPIInstrumentor.instrument_app(app)
SQLAlchemyInstrumentor().instrument(engine=engine)
HTTPXClientInstrumentor().instrument()

# Custom span'lar
tracer = trace.get_tracer(__name__)

@app.post("/orders")
async def create_order(data: OrderCreate):
    with tracer.start_as_current_span("create_order") as span:
        span.set_attribute("user.id", str(data.user_id))
        span.set_attribute("order.items_count", len(data.items))
        
        with tracer.start_as_current_span("validate_inventory"):
            await check_inventory(data.items)
        
        with tracer.start_as_current_span("charge_payment"):
            charge = await stripe_charge(data.total)
            span.set_attribute("charge.id", charge.id)
        
        with tracer.start_as_current_span("save_order"):
            order = await db.save_order(data)
        
        span.set_attribute("order.id", str(order.id))
        return order

SRE practices — chaos engineering

Chaos engineering — production standard

Netflix Chaos Monkey'dan boshlangan. Ideya: production'da atayin failure yaratib ko'rish. Region down, DB lag, network partition. Chaos Mesh, Litmus, Gremlin — tools. Purpose: resilience to'liq test qilib bo'lmaydi, lekin uni simulyatsiya qilish mumkin.

Incident response — production runbook

Alert chirilyapti (PagerDuty/Opsgenie): ───────────────────────────────────────── 1. ACKNOWLEDGE (<2 min) — alert'ni qabul 2. ASSESS (<5 min) — severity, scope, user impact 3. COMMUNICATE — status page, Slack #incident 4. MITIGATE — to'xtatish (rollback > fix-forward) 5. VERIFY — metric'lar yashil 6. POSTMORTEM (24-48h) — blameless root cause Severity levels: SEV-1: major outage, >25% users → all hands, status page SEV-2: partial outage, feature broken → on-call team SEV-3: minor issue, workaround bor → working hours
Part IV

ai architecture — llm, rag, agents, advanced patterns

Zamonaviy AI — passiv chatbot emas, aktiv ish bajaruvchi komponent. LLM fundamentals, barcha RAG arxitekturalari, vector search, fine-tuning, agentic AI, multi-agent systems, MCP, GraphRAG, AI Memory, SLM va MoE — to'liq AI muhandislik bo'limi. Bu qism — butun qo'llanmaning yuragi.

11

llm fundamentals — how language models work

11.1

llm qanday ishlaydi — engineering perspective

LLM black box emas — uni tushunish kerak. Qanday ishlaydi?
javob
LLM — keyingi so'z bashoratchisi, lekin juda aqlli

LLM (Large Language Model) — juda katta (100B+ parametr) transformer neural network. Asosiy vazifa oddiy: berilgan matn (context) keyin qanday token kelishini bashorat qilish. "Paris is the capital of __" → 99.8% ehtimol "France". Bitta token bashorat qilinadi, u context'ga qo'shiladi, keyingi token bashoratlanadi. Bu cycle — "autoregressive generation" deyiladi.

Paradoks: shu oddiy vazifani trilliardlab internet matni bilan o'rganish natijasida, model tasodifan fikrlash, hisob-kitob, kod yozish, tarjima qilish kabi "tushunish" qobiliyatlarini kasb etadi. Shu sababli LLM'lar "emergent capabilities" ko'rsatadi — ular uchun to'g'ridan-to'g'ri o'qitilmagan, lekin bajaraladi.

Context window — LLM'ning "ish xotirasi"

LLM conversation'ni yod bilan saqlamaydi. U faqat joriy context window'dagi matnni "ko'radi" va unga javob beradi. Har yangi API call'da siz butun conversation history'ni (messages array) yuborasiz — LLM uchun har call "yangi boshlanish". 128K token context window ≈ 100K so'z ≈ 300 sahifa kitob — bu katta, lekin shunga qaramay cheklangan. Long conversation → token limit → older messages truncate qilinishi mumkin.

Token — LLM'ning birligi

"Hello, world!" → tokenizer → [15496, 11, 1917, 0] ↑ har token ~3-4 character, yoki bir so'z, yoki so'z qismi. BPE algoritmi. 1 English word ≈ 1.3 tokens 1 Uzbek word ≈ 2.5 tokens (turk tillari tokenization'ga yomon moslashgan) 1000 words ≈ 1300 tokens (EN) yoki ~2500 tokens (UZ) Pricing odatda 1M token uchun: - Input: $3-15 - Output: $15-75 (ancha qimmat!)

Asosiy parametrlar — API call'da

llm_params.py
from anthropic import AsyncAnthropic

client = AsyncAnthropic()

response = await client.messages.create(
    model="claude-opus-4-7",          # model tanlovi
    max_tokens=1024,                   # response limit (to'xtash uchun)
    temperature=0.7,                   # 0.0 = deterministic, 1.0 = creative
    top_p=0.95,                        # nucleus sampling
    system="Sen professional Python dasturchisisan.",  # persona
    messages=[
        {"role": "user", "content": "Fibonacci function yoz."}
    ],
)

# Key parametrlar:
# temperature — kreativlik darajasi
#   0.0: har safar bir xil javob (code, facts)
#   0.3-0.7: professional ishlar
#   1.0+: kreativ yozish, brainstorm
# 
# top_p — probability mass
#   0.9: faqat top 90% ehtimolli token'lar
# 
# max_tokens — qancha token generate qilsin
#   Kengroq context → qimmat va sekinroq

Context window — modeli xotirasi

ModelContext windowOutput maxNotes
GPT-4o128k tokens16kOpenAI
Claude Opus 4.7200k tokens8kAnthropic, reasoning king
Claude Sonnet 4.6200k8kBalanced
Gemini 2.5 Pro2M tokens8kLongest context
Llama 3.3 70B128kOpen-weight, self-host
Mistral Large 2128kEU, open weights
Context window != context quality

2M context ≠ 2M effective attention. "Lost in the middle" fenomen: model o'rtadagi ma'lumotni yomonroq ishlatadi. 50k kerakli context + RAG retrieval > 2M random text. Context tizim muhim bu bir tezlik/narx kompromissi emas.

Prompt engineering — practical patterns

prompts.py
# PATTERN 1 — Role + Task + Format + Examples
SYSTEM_PROMPT = """Sen senior Python backend dasturchi va code reviewer.

Vazifang:
- Berilgan Python kodni ko'rib chiqish
- Security, performance, best practices muammolarini topish  
- Har muammo uchun tuzatilgan versiya berish

Javob formati (JSON):
{
  "issues": [
    {
      "severity": "critical|high|medium|low",
      "category": "security|performance|style|bug",
      "description": "...",
      "line": 15,
      "suggested_fix": "..."
    }
  ],
  "summary": "umumiy baholash"
}

Misollar:
- SQL injection topsang → severity: "critical", category: "security"
- N+1 query topsang → severity: "high", category: "performance"
"""

# PATTERN 2 — Structured output with Pydantic
from pydantic import BaseModel

class Issue(BaseModel):
    severity: Literal["critical", "high", "medium", "low"]
    category: Literal["security", "performance", "style", "bug"]
    description: str
    line: int
    suggested_fix: str

class ReviewResult(BaseModel):
    issues: list[Issue]
    summary: str

# Instructor library bilan (Pydantic validate)
import instructor
client = instructor.from_anthropic(AsyncAnthropic())

result: ReviewResult = await client.messages.create(
    model="claude-opus-4-7",
    response_model=ReviewResult,  # auto-validate
    messages=[{"role": "user", "content": code_to_review}]
)

# PATTERN 3 — Chain of Thought
COT_PROMPT = """Muammo: {problem}

Qadam-qadam o'ylang:
1. Masala nima haqida?
2. Qanday ma'lumotlar berilgan?
3. Qanday yondashuvlar mavjud?
4. Har birini qisqacha baholang
5. Eng yaxshi yondashuvni tanlang va bajaring

Xulosa:"""

Function calling — LLM'ning "qo'llari"

function_calling.py
"""Model tool'larni chaqirib real ishlarni bajara oladi"""

tools = [
    {
        "name": "get_weather",
        "description": "Shaharning ob-havo holatini olish",
        "input_schema": {
            "type": "object",
            "properties": {
                "city": {"type": "string", "description": "Shahar nomi"},
                "units": {"type": "string", "enum": ["celsius", "fahrenheit"]}
            },
            "required": ["city"]
        }
    },
    {
        "name": "search_database",
        "description": "Ichki ma'lumotlar bazasida qidirish",
        "input_schema": {
            "type": "object",
            "properties": {
                "query": {"type": "string"},
                "limit": {"type": "integer", "default": 10}
            },
            "required": ["query"]
        }
    }
]

async def chat_with_tools(user_message: str):
    messages = [{"role": "user", "content": user_message}]
    
    while True:
        response = await client.messages.create(
            model="claude-opus-4-7",
            max_tokens=1024,
            tools=tools,
            messages=messages,
        )
        
        if response.stop_reason == "end_turn":
            return response.content[0].text
        
        if response.stop_reason == "tool_use":
            # Model tool chaqirishni so'radi
            messages.append({"role": "assistant", "content": response.content})
            
            tool_results = []
            for block in response.content:
                if block.type == "tool_use":
                    result = await execute_tool(block.name, block.input)
                    tool_results.append({
                        "type": "tool_result",
                        "tool_use_id": block.id,
                        "content": json.dumps(result)
                    })
            
            messages.append({"role": "user", "content": tool_results})
            # Loop — model tool natijasi bilan keyingi qadam
12

rag architectures — naive to agentic

12.1

nima uchun rag va qanday ishlaydi

LLM biladi, lekin sizning ma'lumotlaringizni bilmaydi. Qanday qo'shiladi?
javob
RAG = Retrieval-Augmented Generation — asosiy g'oya

LLM (ChatGPT, Claude, Llama) — bu juda bilimli, lekin o'zgarmas bilim. Ikkita muammo: (1) Knowledge cutoff — 2023-yil aprelidan keyingi yangilikni bilmaydi; (2) Private data — sizning company hujjatlaringizni, DB'ingizni, internal wiki'ngizni bilmaydi. Fine-tuning ham javob emas — har yangi ma'lumotda qayta train qila olmaysiz.

RAG yechimi: savol kelganda — avval tegishli hujjatlarni topib kelamiz, keyin LLM'ga shunday deymiz: "Mana bu hujjatlar asosida javob ber". LLM endi o'z bilimini emas, berilgan context'ni ishlatadi. Natijada: hallucination kamayadi (manbasi bor javob beradi), private data ishlatiladi, real-time yangilanadi (DB yangisi — darhol ko'rinadi).

Vector search qanday ishlaydi? — Matematik asosi

Oddiy database qidiruvdan farqi: SQL'da WHERE text LIKE '%qidiruv%' — faqat so'zma-so'z moslik. "Raqamli bank" so'rasangiz "digital banking" topilmaydi. Vector search esa ma'noní (semantika'ni) tushunadigan matematik o'lchov ishlatadi.

Embedding modeli har matnni 1000+ o'lchamli vektorda (son ro'yxati) tasvirlaydi. O'xshash ma'noli matnlar vektori bir-biriga yaqin bo'ladi. "Raqamli bank" va "digital banking" vektorlari bir-biriga yaqin — cosine similarity 0.93 bo'ladi. Savol vektori topilganda, DB'dagi eng yaqin (similar) vektorlarni qidiramiz — bu k-NN search. HNSW algoritmi bu qidiruvni millionlab vektorlarda ham millisekundlarda qiladi.

Classic RAG pipeline

INDEXING PHASE (offline, bir marta) Documents PDF, web, DB Chunker 500 token/chunk Embedder text → vector Vector DB Qdrant/pgvector QUERY PHASE (real-time, har so'rovda) User Q "qaysi..." Embed Q same model Similarity top-k retrieval Rerank cross-encoder LLM generate+cite Answer + sources vector search

Naive RAG — eng oddiy implementatsiya

naive_rag.py
from openai import AsyncOpenAI
from qdrant_client import AsyncQdrantClient

openai = AsyncOpenAI()
qdrant = AsyncQdrantClient(url="...")

# ─── INDEXING (offline, once) ───
async def index_document(doc_id: str, content: str, metadata: dict):
    # 1. Chunking
    chunks = chunk_text(content, chunk_size=500, overlap=50)
    
    # 2. Embedding (batch)
    embeddings = await openai.embeddings.create(
        model="text-embedding-3-small",  # 1536 dim
        input=chunks
    )
    
    # 3. Store
    points = [
        {"id": f"{doc_id}_{i}", 
         "vector": emb.embedding,
         "payload": {"doc_id": doc_id, "chunk_index": i, "text": chunk, **metadata}}
        for i, (chunk, emb) in enumerate(zip(chunks, embeddings.data))
    ]
    await qdrant.upsert(collection_name="kb", points=points)

# ─── QUERY (real-time) ───
async def answer_question(question: str, tenant_id: str) -> dict:
    # 1. Embed question (same model as indexing!)
    query_emb = (await openai.embeddings.create(
        model="text-embedding-3-small",
        input=[question]
    )).data[0].embedding
    
    # 2. Retrieve top-k
    results = await qdrant.search(
        collection_name="kb",
        query_vector=query_emb,
        query_filter={"must": [{"key": "tenant_id", "match": {"value": tenant_id}}]},
        limit=5,
        score_threshold=0.7  # past score'larni filter
    )
    
    # 3. Build context
    context = "\n\n".join([
        f"[Source {i+1}] {r.payload['text']}"
        for i, r in enumerate(results)
    ])
    
    # 4. LLM generation
    response = await openai.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": 
             "Javob bering FAQAT berilgan context asosida. "
             "Agar context yetarli emas — 'Ma'lumot yo'q' deb yozing. "
             "Har faktdan keyin [Source N] manba ko'rsating."},
            {"role": "user", "content": f"Context:\n{context}\n\nSavol: {question}"}
        ],
        temperature=0.1  # faktual javob uchun past
    )
    
    return {
        "answer": response.choices[0].message.content,
        "sources": [{"text": r.payload["text"], "score": r.score, "doc_id": r.payload["doc_id"]} 
                    for r in results]
    }

Chunking strategies — to'g'ri bo'lish kalit

StrategiyaTavsifQachon
Fixed sizeHar 500 token — kesiladiOddiy, ishonchli baseline
SemanticParagraph/heading bo'yichaStructured content (docs, articles)
Sliding window500 token + 50 overlapContext continuity muhim
Sentence-awareJumla chegarasidaQA use case
Parent-childSmall chunk search → big chunk contextPrecision + context balance
Document-awareFull doc + summaryKichik corpus, toliq context
12.2

hybrid rag — production baseline

Pure vector search yetarli emas — hybrid nima va nega?
javob
Vector semantic, BM25 keyword

Vector search "ma'no" bo'yicha qidiradi — "Python error" va "Python exception"ni bir deb ko'radi. Lekin ID, model name, unique term'larni yomon topadi — "GPT-4o-mini"ni "GPT-4o"dan ajrata olmaydi. BM25 keyword search bu yerda kuchli. Hybrid = ikkalasi.

Hybrid search implementation

hybrid_rag.py
"""Hybrid = BM25 (keyword) + Vector (semantic) + RRF fusion"""

async def hybrid_search(question: str, k: int = 20) -> list[dict]:
    # 1. Vector search — semantic
    query_emb = await embed(question)
    vector_results = await qdrant.search(
        collection_name="kb",
        query_vector=query_emb,
        limit=k
    )
    
    # 2. BM25 search — keyword (PostgreSQL yoki Elasticsearch)
    bm25_results = await db.fetch_all("""
        SELECT id, chunk, doc_id,
               ts_rank_cd(search_vector, query) AS score
        FROM knowledge_chunks,
             plainto_tsquery('english', $1) query
        WHERE search_vector @@ query
        ORDER BY score DESC
        LIMIT $2
    """, question, k)
    
    # 3. Reciprocal Rank Fusion (RRF) — 2 ro'yxatni birlashtirish
    scores = {}
    RRF_K = 60
    
    for rank, r in enumerate(vector_results):
        scores[r.id] = scores.get(r.id, 0) + 1.0 / (RRF_K + rank)
    
    for rank, r in enumerate(bm25_results):
        scores[r["id"]] = scores.get(r["id"], 0) + 1.0 / (RRF_K + rank)
    
    # 4. Sort va return
    top_ids = sorted(scores.keys(), key=lambda i: scores[i], reverse=True)[:10]
    return [await get_chunk(id) for id in top_ids]


async def rag_with_rerank(question: str) -> dict:
    # 1. Hybrid retrieval — over-fetch (20)
    candidates = await hybrid_search(question, k=20)
    
    # 2. Reranker — cross-encoder (expensive lekin aniq)
    #    Query + chunk'ni bitta model'ga yuboramiz, aniqroq score
    from cohere import AsyncClient as CohereClient
    cohere = CohereClient(...)
    
    rerank_result = await cohere.rerank(
        query=question,
        documents=[c["text"] for c in candidates],
        model="rerank-english-v3.0",
        top_n=5
    )
    
    # 3. Top 5 — eng sifatli context
    reranked = [candidates[r.index] for r in rerank_result.results]
    
    # 4. LLM
    return await generate_with_context(question, reranked)
Reranker — hidden superpower

Reranker (Cohere Rerank, BGE-reranker) retrieval accuracy'ni 30-50% oshiradi. Cross-encoder — query va har candidate'ni birga yuborib, aniq score chiqaradi. Over-fetch 20 → rerank → top 5 — 2026 production standard.

10 ta RAG arxitekturasi

TypeNimaUse case
1. Naive RAGVanilla embed → retrieve → generatePrototype, boshlang'ich
2. Hybrid RAGBM25 + vector + RRFProduction baseline
3. Rerank RAGOver-fetch + cross-encoder rerankHigh precision kerak
4. HyDEHypothetical Document Embedding — LLM javob generate qilib, shuni embed qilib qidiradiSparse queries
5. Graph RAGKnowledge graph + vectorEntity relationships kerak
6. Multi-modal RAGText + image + videoKatalog, media
7. Agentic RAGAgent iterative retrieves, decidesMulti-hop, research
8. Self-correcting (CRAG)Retrieval quality check → re-retrieveComplex queries
9. Adaptive RAGQuestion type → different strategyMixed workload
10. Contextual RAGHar chunk'ga "context prefix" (Anthropic)35%+ accuracy improvement

Contextual Retrieval — breakthrough texnika

Anthropic'ning Contextual Retrieval texnikasi: har chunk'dan oldin LLM'dan "bu chunk qaysi context'dan?" deb qisqa izoh qo'shdiramiz. Embedding va BM25 sifatini sezilarli oshiradi.

contextual_rag.py
CONTEXTUALIZE_PROMPT = """Quyida to'liq hujjat va undan olingan qism.
Berilgan qism hujjatning qaysi qismiga tegishli, qanday context'da ekanini qisqa 
50-100 so'zda tushuntiring.

Full document:
{full_doc}

Chunk:
{chunk}

Context (faqat qisqa izoh, hech narsa qo'shmang):"""

async def contextualize_chunk(full_doc: str, chunk: str) -> str:
    """Har chunk'dan oldin qo'shiladigan kontekst izoh"""
    response = await client.messages.create(
        model="claude-haiku-4-5-20251001",  # kichik model — arzon
        max_tokens=150,
        messages=[{
            "role": "user", 
            "content": CONTEXTUALIZE_PROMPT.format(full_doc=full_doc[:50000], chunk=chunk)
        }]
    )
    return response.content[0].text

async def index_with_context(doc_id: str, content: str):
    chunks = chunk_text(content, 500)
    
    # Har chunk'ni kontekstualizatsiya qilamiz
    contextualized = await asyncio.gather(*[
        contextualize_chunk(content, c) for c in chunks
    ])
    
    # Chunk = context + original
    final_chunks = [f"{ctx}\n\n{chunk}" for ctx, chunk in zip(contextualized, chunks)]
    
    # Embed va store
    embeddings = await embed_batch(final_chunks)
    await qdrant.upsert(...)

Agentic RAG — next frontier

Classic RAG — one-shot: retrieve once, generate once. Agentic RAG — LLM agent iterativ qidiradi, o'zi qaror qiladi "yetarli ma'lumot olindimi?"

agentic_rag.py
"""Agent retrieval'ni tool sifatida ishlatadi"""

TOOLS = [
    {
        "name": "search_knowledge",
        "description": "Ichki KB'dan qidirish. Multiple tadqiqot savollari uchun bir necha marta chaqirish mumkin.",
        "input_schema": {
            "type": "object",
            "properties": {"query": {"type": "string"}, "limit": {"type": "integer", "default": 5}},
            "required": ["query"]
        }
    },
    {
        "name": "search_web",
        "description": "Yangi tashqi ma'lumot kerak bo'lsa web search",
        "input_schema": {
            "type": "object",
            "properties": {"query": {"type": "string"}},
            "required": ["query"]
        }
    }
]

AGENTIC_SYSTEM = """Sen research agentsan. Murakkab savollarga javob berish uchun:
1. Savolni sub-savollarga bo'l
2. Har biri uchun search_knowledge yoki search_web ishlat
3. Yetarli ma'lumot yig'ilgach — sintez qil
4. MAX 5 ta search iteratsiyasi"""

async def agentic_rag_answer(question: str, max_iterations: int = 5):
    messages = [{"role": "user", "content": question}]
    iteration = 0
    
    while iteration < max_iterations:
        response = await client.messages.create(
            model="claude-opus-4-7",
            max_tokens=2000,
            tools=TOOLS,
            system=AGENTIC_SYSTEM,
            messages=messages
        )
        
        if response.stop_reason == "end_turn":
            return response.content[0].text
        
        messages.append({"role": "assistant", "content": response.content})
        tool_results = []
        for block in response.content:
            if block.type == "tool_use":
                if block.name == "search_knowledge":
                    results = await hybrid_search(block.input["query"])
                    tool_results.append({
                        "type": "tool_result",
                        "tool_use_id": block.id,
                        "content": json.dumps(results[:5])
                    })
        
        messages.append({"role": "user", "content": tool_results})
        iteration += 1
    
    # Budget tugadi — hozirgi ma'lumot bilan javob
    messages.append({"role": "user", "content": "Budget tugadi. Hozirgi ma'lumot asosida javob ber."})
    final = await client.messages.create(model="claude-opus-4-7", max_tokens=2000, messages=messages)
    return final.content[0].text
Agentic RAG — qimmat, lekin aniq

Classic RAG: ~2000 token/query. Agentic RAG: 10-20k token (3-10x). Lekin multi-hop savollarga javob sifati dramatic oshadi. Stop condition muhim — iteration budget, confidence threshold, cost cap.

12.3

rag evaluation — how to measure

RAG sistemangiz yaxshi ishlayaptimi qanday bilasiz?
javob

Key metrics

MetricNima o'lchaydiTool
Context PrecisionRetrieved chunks relevantmi?RAGAS
Context RecallHamma kerakli chunk topildi?RAGAS
FaithfulnessJavob context'ga mos?RAGAS
Answer RelevanceJavob savolga mos?RAGAS
Hit Rate @kTop-k'da to'g'ri chunk bormi?Custom
MRRTo'g'ri chunk o'rtacha qaysi o'rinda?Custom
ragas_eval.py
from ragas import evaluate
from ragas.metrics import context_precision, context_recall, faithfulness, answer_relevance
from datasets import Dataset

# Ground truth dataset
eval_data = Dataset.from_dict({
    "question": [...],           # savollar
    "answer": [...],             # sizning RAG javoblari
    "contexts": [...],           # retrieved chunks
    "ground_truth": [...]        # to'g'ri javoblar (annotator)
})

result = evaluate(
    eval_data,
    metrics=[context_precision, context_recall, faithfulness, answer_relevance]
)
print(result)
# {'context_precision': 0.82, 'context_recall': 0.78, ...}
13

embeddings & vector search — deep dive

13.1

embedding models — tanlov va tradeoffs

Qaysi embedding model'ni qachon tanlash?
javob
Embedding — text → vector

Embedding model matnni fixed-length raqamlar vektoriga aylantiradi. Semantik o'xshashlik — vektorlar o'rtasidagi masofaga proporsional. "Python code" va "Python dasturlash" — kuchli o'xshashlik. Embedding quality'si RAG natijalarini to'g'ridan to'g'ri belgilaydi.

Embedding models landscape

ModelDimContextNarx / DeployBest for
OpenAI text-embedding-3-large30728192$0.13/1M tokensEnglish, general
OpenAI text-embedding-3-small15368192$0.02/1MCost-effective baseline
Cohere embed-v31024512$0.10/1MMultilingual
Voyage-3102432k$0.06/1MLong docs, technical
BGE-M3 (BAAI)10248kSelf-host freeMultilingual open-source
Nomic-embed-text-v1.57688kSelf-host / $0.02Open, good English
Jina-embeddings-v310248kSelf-hostMultilingual, fine-tunable

Matryoshka embeddings — adaptive trick

Yangi embedding modellar dimension truncationni qo'llab-quvvatlaydi. 3072 dim embedding'ni 256 dim ga kesasiz — aksar 90%+ sifat saqlanadi. Disk/RAM 12x tejaladi.

matryoshka.py
# OpenAI — dimensions parameter
response = await openai.embeddings.create(
    model="text-embedding-3-large",
    input=["hello"],
    dimensions=512    # 3072 → 512 ga truncate
)

# Yoki oddiy indexing:
# Coarse search: 256-dim (tez)
# Rerank: to'liq 3072-dim (aniq)

Multilingual — o'zbek tiliga

O'zbek tilida ishlaydigan embedding modellari: Cohere embed-v3 (muloyim), BGE-M3 (open-source), multilingual-e5-large (self-host). OpenAI hozir yaxshi ishlaydi lekin eng yaxshi emas.

13.2

rerankers — precision booster

Reranker nima va nega 30-50% accuracy oshiradi?
javob
Bi-encoder vs Cross-encoder

Embedding model (bi-encoder): query va chunk'ni alohida embed qiladi. Tez, lekin query-chunk interaction yo'q. Reranker (cross-encoder): query + chunk birga model'ga yuboradi. Sekinroq, lekin ancha aniqroq.

rerank.py
# Option 1 — Cohere Rerank (managed)
from cohere import AsyncClient

cohere = AsyncClient(api_key="...")

async def cohere_rerank(query: str, docs: list[str], top_n: int = 5):
    result = await cohere.rerank(
        query=query,
        documents=docs,
        model="rerank-multilingual-v3.0",  # 100+ tillar
        top_n=top_n
    )
    return [(r.index, r.relevance_score) for r in result.results]

# Option 2 — BGE-reranker (self-host)
from FlagEmbedding import FlagReranker

reranker = FlagReranker('BAAI/bge-reranker-v2-m3', use_fp16=True)

def local_rerank(query: str, docs: list[str], top_n: int = 5):
    pairs = [[query, doc] for doc in docs]
    scores = reranker.compute_score(pairs, normalize=True)
    ranked = sorted(enumerate(scores), key=lambda x: x[1], reverse=True)[:top_n]
    return ranked

# Option 3 — Jina reranker
from jina_ai_search_foundation_sdk import JinaRerank
Over-fetch → rerank pattern

Production'da har doim: retrieve 20-30 → rerank top 5. Bu "over-fetch" strategiyasi latency'ni unchalik oshirmaydi (10-50ms qo'shadi), lekin accuracy dramatic oshadi. Har RAG system'da shu pattern kerak.

14

fine-tuning & local llms — lora, qlora, deployment

14.1

qachon fine-tune kerak vs rag vs prompting

Prompt, RAG, yoki fine-tuning — qaysi birini qachon?
javob
Decision tree — nima kerak? ───────────────────────────────────── Yangi/aniq faktual ma'lumot kerakmi? ├─ Ha → RAG (knowledge injection) └─ Yo'q, fixed └─ Behavior/style/format o'zgartirmoqchimisiz? ├─ Yo'q → Prompt engineering yetarli ├─ Ha, arzon usul → Few-shot prompt, system prompt └─ Ha, consistent/volume → Fine-tune (LoRA/QLoRA) Hybrid pattern — eng kuchli: Base model + Fine-tuned adapter + RAG context
TexnikaCostLatencyQachon yaxshi
Prompting$ (faqat API calls)NormalBoshlash, kichik use-case
Few-shot$$ (token cost oshdi)NormalOddiy format, 3-5 misol
RAG$$ (embed + retrieve + gen)SekinroqFaktual, yangilanib turadi
LoRA fine-tune$$$ (bir martalik)Fast inferenceStyle, format, domain
Full fine-tune$$$$$FastKamdan-kam holat, enterprise
14.2

lora va qlora — parameter-efficient fine-tuning

LoRA va QLoRA qanday ishlaydi va qanday qilinadi?
javob
LoRA magicasi

7B parametrli model'ning hammasini fine-tune qilish → 120 GB VRAM kerak. LoRA (Low-Rank Adaptation) — base model parametrlarini "muzlatamiz", kichik (<0.1%) qo'shimcha "adapter" matritsalarni o'rgatamiz. 24 GB VRAM yetadi. QLoRA — base model'ni 4-bit'ga quantize + LoRA. 12 GB VRAM — RTX 4070 Ti!

Hardware requirements

MethodModel sizeVRAM kerakGPU
Full fine-tune7B~120 GB2× A100 80GB
LoRA7B16-24 GBRTX 4090 / A100 40GB
QLoRA7B8-12 GBRTX 4070 Ti / 3090
QLoRA70B48 GBA6000 / 2× RTX 4090

Complete QLoRA training (Unsloth)

qlora_train.py
"""QLoRA fine-tuning Llama 3.1 8B on custom domain data"""
from unsloth import FastLanguageModel
from trl import SFTTrainer
from transformers import TrainingArguments
from datasets import load_dataset
import torch

# 1. Model yuklash (4-bit quantized)
model, tokenizer = FastLanguageModel.from_pretrained(
    model_name="unsloth/Meta-Llama-3.1-8B-Instruct",
    max_seq_length=2048,
    dtype=None,
    load_in_4bit=True,  # QLoRA — NF4 quantization
)

# 2. LoRA adapter qo'shish
model = FastLanguageModel.get_peft_model(
    model,
    r=16,              # rank — capacity vs overfitting tradeoff
    target_modules=[
        "q_proj", "k_proj", "v_proj", "o_proj",  # attention
        "gate_proj", "up_proj", "down_proj"       # MLP
    ],
    lora_alpha=16,     # scaling — odatda r ga teng
    lora_dropout=0,
    bias="none",
    use_gradient_checkpointing="unsloth",  # RAM tejash
    random_state=42,
)

# 3. Dataset prep — instruction format
def format_example(example):
    return {
        "text": f"""### Instruction:
{example['instruction']}

### Input:
{example.get('input', '')}

### Response:
{example['output']}"""
    }

dataset = load_dataset("json", data_files="training_data.jsonl", split="train")
dataset = dataset.map(format_example)

# 4. Training args
trainer = SFTTrainer(
    model=model,
    tokenizer=tokenizer,
    train_dataset=dataset,
    dataset_text_field="text",
    max_seq_length=2048,
    args=TrainingArguments(
        output_dir="outputs",
        per_device_train_batch_size=2,
        gradient_accumulation_steps=4,   # effective batch = 8
        num_train_epochs=3,
        learning_rate=2e-4,              # LoRA — 1e-4 to 3e-4
        warmup_ratio=0.03,
        lr_scheduler_type="cosine",
        logging_steps=10,
        save_steps=100,
        fp16=not torch.cuda.is_bf16_supported(),
        bf16=torch.cuda.is_bf16_supported(),
        optim="adamw_8bit",              # memory-efficient
        seed=42,
    ),
)

# 5. Train!
trainer.train()

# 6. Save adapter (kichik — ~200 MB)
model.save_pretrained("my-finetuned-adapter")
tokenizer.save_pretrained("my-finetuned-adapter")

# 7. Merge va GGUF export (Ollama uchun)
model.save_pretrained_merged("merged_model", tokenizer, save_method="merged_16bit")
model.save_pretrained_gguf("gguf_model", tokenizer, quantization_method="q4_k_m")

Dataset prep — kalit muvaffaqiyat

Dataset — quality over quantity

500 ta high-quality misol > 50,000 ta noisy misol. Har misol model'ga "ideal javob" ko'rsatadi. 1-2 xato misol modelga noto'g'ri habit o'rgatadi. Human review shart.

dataset.jsonl
{"instruction": "Berilgan Python kodda xato topib, tuzatilgan versiyani bering.",
 "input": "def divide(a, b): return a / b",
 "output": "```python\ndef divide(a: float, b: float) -> float:\n    if b == 0:\n        raise ValueError('Cannot divide by zero')\n    return a / b\n```\n\nTuzatish: `ZeroDivisionError` himoyasi va type hints qo'shildi."}
{"instruction": "SQL query'ni optimallashtiring.",
 "input": "SELECT * FROM orders WHERE UPPER(email) = 'TEST@EXAMPLE.COM'",
 "output": "..."}

Local deployment — Ollama

ollama_deploy.sh
# Ollama install
curl -fsSL https://ollama.com/install.sh | sh

# Modelfile yaratish (fine-tuned modelni Ollama'ga qo'shish)
cat > Modelfile << 'EOF'
FROM ./gguf_model/unsloth.Q4_K_M.gguf

TEMPLATE """### Instruction:
{{ .Prompt }}

### Response:
"""

PARAMETER temperature 0.7
PARAMETER top_p 0.9
PARAMETER num_ctx 2048
EOF

# Model create
ollama create my-assistant -f Modelfile

# Ishga tushirish
ollama run my-assistant

# API sifatida (OpenAI-compatible)
# http://localhost:11434/v1/chat/completions
ollama_client.py
from openai import OpenAI

# Ollama OpenAI-compatible API'ni qo'llab-quvvatlaydi
client = OpenAI(
    base_url="http://localhost:11434/v1",
    api_key="ollama"  # ignored
)

response = client.chat.completions.create(
    model="my-assistant",
    messages=[
        {"role": "user", "content": "SQL query'ni qanday optimallashtirish?"}
    ]
)
print(response.choices[0].message.content)
Amaliy realitet

$300 GPU + 500 ta misol + 2-3 soat training = production-ready domain-specific model. Hamma biznes o'ziga shunday model yarata oladi. Brand voice, compliance, offline inference. Hybrid: base model (general) + LoRA adapter (domain) + RAG (facts) — 2026 arxitektura.

15

ai agents, multi-agent & mcp

15.1

ai agents — autonomous task executors

AI agent nima va passiv chatbot'dan qanday farq qiladi?
javob
Chatbot vs Agent — asosiy farq

Chatbot: savol → javob. Passiv, bir martalik. Foydalanuvchi har qadamda yo'l-yo'riq berishi kerak.
Agent: maqsad beriladi → agent o'zi reja tuzadi → kerakli vositalarni (tool) ishlatadi → natijani tekshiradi → agar xato bo'lsa, o'zini tuzatadi → maqsadga yetguncha takrorlaydi. Autonomous, iterativ. "GitHub issue'larni tahlil qilib, eng muhimini Discord'ga yozib qo'y" — bu agent ishi, chatbot qila olmaydi.

ReAct pattern — agent "fikrlash" tarzini tushunish

Agent qanday qaror qiladi? Eng keng tarqalgan yondashuv — ReAct (Reason + Act). Har qadamda LLM uch narsa qiladi: (1) Thought — "Endi nima qilishim kerak, qanday mantig'?" deb o'ylaydi (inson kabi fikrlash); (2) Action — biror tool chaqiradi (DB query, API call, file read); (3) Observation — tool natijasini ko'radi. Keyin yangi Thought boshlanadi. Bu cycle maqsadga yetilguncha yoki max_iterations tugaguncha davom etadi.

Misol: "Beeline'ning oxirgi oylik revenue qancha?" → Thought: "DB'ga so'rov yuborish kerak" → Action: query_db("SELECT SUM(revenue)...")Observation: "1,240,000,000 UZS" → Thought: "Endi bu raqamni formatlash kerak" → Action: "format_currency(...)" → Final Answer.

Agent anatomy — 4 komponent

LLM Brain plan, reason, decide Memory short + long term vector DB, context Tools APIs, search, DB, file ops, MCP Planning decompose goal into steps Reflection self-critique, error recovery Agent = LLM + Memory + Tools + Planning + Reflection loop

ReAct pattern — reasoning + acting

react_agent.py
"""ReAct: Thought → Action → Observation loop"""

REACT_PROMPT = """Sen agentsan. Berilgan vazifani bajarish uchun tool'lardan foydalanasan.

Har qadamda:
Thought: nima qilmoqchiligingni o'yla
Action: tool chaqirish
Observation: tool natijasi (avtomatik to'ldiriladi)

Yakuniy javob bo'lganda:
Final Answer: to'liq javob

Tools:
- search_docs(query): ichki hujjatlarda qidirish
- fetch_url(url): sahifa matnini olish
- calculate(expression): hisoblash
- query_db(sql): ma'lumotlar bazasidan

Vazifa: {task}"""

async def react_agent(task: str, max_steps: int = 10):
    messages = [{"role": "user", "content": REACT_PROMPT.format(task=task)}]
    
    for step in range(max_steps):
        response = await client.messages.create(
            model="claude-opus-4-7",
            max_tokens=1024,
            tools=TOOLS,
            messages=messages,
        )
        
        messages.append({"role": "assistant", "content": response.content})
        
        # Terminal condition
        if response.stop_reason == "end_turn":
            for block in response.content:
                if block.type == "text" and "Final Answer:" in block.text:
                    return block.text.split("Final Answer:")[-1].strip()
        
        # Tool execution
        if response.stop_reason == "tool_use":
            tool_results = []
            for block in response.content:
                if block.type == "tool_use":
                    try:
                        result = await execute_tool(block.name, block.input)
                        tool_results.append({
                            "type": "tool_result",
                            "tool_use_id": block.id,
                            "content": json.dumps(result)[:5000]  # truncate
                        })
                    except Exception as e:
                        tool_results.append({
                            "type": "tool_result",
                            "tool_use_id": block.id,
                            "content": f"Error: {str(e)}",
                            "is_error": True
                        })
            messages.append({"role": "user", "content": tool_results})
    
    return "Max steps reached without final answer"
15.2

multi-agent systems — orchestration patterns

Bir agent yetmaydi — ko'p agent qanday birga ishlaydi?
javob
Ixtisoslashgan agent'lar

Bitta "hamma narsa qiladigan" agent sifat va aniqlik'da past. Yaxshiroq: researcheranalystwriter — har biri bitta ishga ixtisoslashgan. Software team kabi: PM, backend dev, frontend dev, QA.

Supervisor pattern — hierarchical

multi_agent.py
"""Supervisor agent sub-agent'larga vazifalar tarqatadi"""
from langgraph.graph import StateGraph, END
from typing import TypedDict, Literal

class AgentState(TypedDict):
    task: str
    research: str
    analysis: str
    final_report: str
    next_step: str

# Specialist agents
async def researcher_agent(state: AgentState):
    """Ma'lumot yig'ish"""
    result = await run_agent(
        system="Sen researcher. Faqat faktlarni yig'. Source cite qil.",
        tools=[search_web, search_docs],
        task=state["task"]
    )
    return {"research": result}

async def analyst_agent(state: AgentState):
    """Tahlil va insight"""
    result = await run_agent(
        system="Sen data analyst. Raqamlarni tushuntir, trend'larni top.",
        tools=[calculate, query_db],
        task=f"Research:\n{state['research']}\n\nTahlil qil."
    )
    return {"analysis": result}

async def writer_agent(state: AgentState):
    """Final report yozish"""
    result = await run_agent(
        system="Sen technical writer. Aniq, qisqa, strukturali yoz.",
        task=f"Research:\n{state['research']}\n\nAnalysis:\n{state['analysis']}\n\nReport yoz."
    )
    return {"final_report": result}

# Supervisor
async def supervisor(state: AgentState) -> dict:
    """Supervisor keyingi kimni chaqirishni aniqlaydi"""
    response = await client.messages.create(
        model="claude-opus-4-7",
        system="Sen supervisor. Berilgan state'ga qarab keyingi qadamni tanla: "
               "researcher, analyst, writer, yoki done.",
        messages=[{"role": "user", "content": json.dumps(state)}]
    )
    next_step = response.content[0].text.strip().lower()
    return {"next_step": next_step}

def route(state: AgentState) -> Literal["researcher", "analyst", "writer", END]:
    return state["next_step"] if state["next_step"] != "done" else END

# Graph tuzish
graph = StateGraph(AgentState)
graph.add_node("supervisor", supervisor)
graph.add_node("researcher", researcher_agent)
graph.add_node("analyst", analyst_agent)
graph.add_node("writer", writer_agent)

graph.set_entry_point("supervisor")
graph.add_conditional_edges("supervisor", route)
graph.add_edge("researcher", "supervisor")
graph.add_edge("analyst", "supervisor")
graph.add_edge("writer", "supervisor")

app = graph.compile()

# Run
result = await app.ainvoke({"task": "2026 RAG arxitekturalarini tahlil qiling"})

Swarm pattern — peer-to-peer

Agent'lar bir-biriga "handoff" qiladi, supervisor yo'q. Sodda, dinamik. OpenAI Swarm kutubxonasi yoki CrewAI.

crewai.py
from crewai import Agent, Task, Crew, Process

researcher = Agent(
    role="Research Analyst",
    goal="2026 backend trendlarini chuqur tadqiq qilish",
    backstory="10 yil tajribali tech analyst",
    tools=[search_tool, browser_tool],
    verbose=True
)

writer = Agent(
    role="Technical Writer",
    goal="Research natijalari asosida clear article yozish",
    backstory="Tech blog writer",
    verbose=True
)

research_task = Task(
    description="2026 backend architecture trends haqida comprehensive research",
    agent=researcher,
    expected_output="Detailed findings with sources"
)

write_task = Task(
    description="Research asosida 2000 so'zli blog post yozish",
    agent=writer,
    expected_output="Publication-ready blog post",
    context=[research_task]   # dependency
)

crew = Crew(
    agents=[researcher, writer],
    tasks=[research_task, write_task],
    process=Process.sequential
)

result = crew.kickoff()

Multi-agent patterns comparison

PatternCoordinationBest for
SequentialFixed pipelineWell-defined workflows
SupervisorCentral dispatcherComplex, branching tasks
Swarm/HandoffPeer-to-peerDynamic, unpredictable
HierarchicalTeams + managersLarge-scale, specialty
Debate/ConsensusMultiple agreeHigh-stakes decisions
15.3

mcp — model context protocol

MCP nima va nima uchun muhim?
javob
MCP — "USB-C for AI" — nima muammoni hal qiladi?

Muammo: har AI app o'z-o'zicha tool integration yozadi. GitHub Copilot o'z GitHub integration'ini yozgan, Cursor o'zini, Claude Desktop o'zini — har biri turlicha. Agar siz PostgreSQL MCP server yozsangiz, faqat bitta AI bilan ishlardi.

MCP yechimi: Anthropic 2024-yil oxirida open standard chiqardi. MCP server bitta protokolga muvofiq yoziladi → Claude, ChatGPT, Cursor, va istalgan MCP-compatible AI bilan ishlaydi. "Bir marta yoz, hamma joyda" — USB-C analogi. 2025-2026'da 2000+ MCP server ecosystem paydo bo'ldi (GitHub, Slack, Notion, PostgreSQL, Jira...).

MCP uchta primitiv — Tools, Resources, Prompts

Tools — LLM chaqira oladigan funksiyalar (DB query, API call, fayl o'qish). AI function calling bilan o'xshash, lekin standarti bor. Resources — LLM o'qiy oladigan ma'lumotlar (fayl tarkibi, DB row, URL content) — tool chaqirmay, context'ga qo'shiladi. Prompts — oldindan tayyorlangan prompt template'lar (masalan, "code review" — standart ko'rsatmalar bilan). Shu uchlik bir MCP server'da hammasi bo'lishi mumkin.

MCP architecture

Claude Desktop MCP client Cursor IDE MCP client Your App MCP client MCP Protocol JSON-RPC 2.0 over stdio / SSE / WebSocket ─ Tools ─ Resources ─ Prompts GitHub MCP Slack MCP Postgres MCP Your custom MCP

MCP server yozish — minimal misol

mcp_server.py
"""MCP server — har AI client foydalanishi mumkin"""
from mcp.server import Server, NotificationOptions
from mcp.server.models import InitializationOptions
import mcp.types as types

server = Server("weather-mcp")

# 1. Tool — LLM chaqirishi mumkin
@server.list_tools()
async def list_tools() -> list[types.Tool]:
    return [
        types.Tool(
            name="get_weather",
            description="Shahar ob-havosi",
            inputSchema={
                "type": "object",
                "properties": {
                    "city": {"type": "string"},
                    "units": {"type": "string", "enum": ["c", "f"]}
                },
                "required": ["city"]
            }
        )
    ]

@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[types.TextContent]:
    if name == "get_weather":
        result = await fetch_weather(arguments["city"], arguments.get("units", "c"))
        return [types.TextContent(type="text", text=json.dumps(result))]

# 2. Resource — static ma'lumot (masalan, fayl)
@server.list_resources()
async def list_resources() -> list[types.Resource]:
    return [
        types.Resource(
            uri="weather://stations",
            name="Weather stations catalog",
            mimeType="application/json"
        )
    ]

@server.read_resource()
async def read_resource(uri: str) -> str:
    if uri == "weather://stations":
        return json.dumps(await get_stations())

# 3. Prompt — reusable template
@server.list_prompts()
async def list_prompts() -> list[types.Prompt]:
    return [
        types.Prompt(
            name="weather_report",
            description="Ob-havo hisoboti yaratish",
            arguments=[
                types.PromptArgument(name="city", description="Shahar", required=True)
            ]
        )
    ]

# Run stdio transport
if __name__ == "__main__":
    import asyncio
    from mcp.server.stdio import stdio_server
    
    async def main():
        async with stdio_server() as (read, write):
            await server.run(read, write, InitializationOptions(
                server_name="weather-mcp",
                server_version="1.0.0",
                capabilities=server.get_capabilities(
                    notification_options=NotificationOptions(),
                    experimental_capabilities={}
                )
            ))
    
    asyncio.run(main())

Claude Desktop'ga MCP qo'shish

claude_desktop_config.json
{
  "mcpServers": {
    "weather": {
      "command": "python",
      "args": ["/path/to/mcp_server.py"]
    },
    "postgres": {
      "command": "npx",
      "args": ["-y", "@modelcontextprotocol/server-postgres", "postgresql://..."]
    },
    "github": {
      "command": "docker",
      "args": ["run", "-i", "--rm", "-e", "GITHUB_TOKEN", "mcp/github"],
      "env": {"GITHUB_TOKEN": "ghp_..."}
    }
  }
}
Nega MCP mainstream bo'ldi?

OpenAI, Google, Microsoft hammasi qabul qilishdi. MCP registry'da 500+ server mavjud. Your internal tools ham MCP server qilib yozsangiz — har AI tool'da ishlaydi. Enterprise integration standard bo'lmoqda.

16

production ai — llmops, safety, cost optimization

16.1

llmops — production ai lifecycle

AI tizimni production'da qanday boshqarish?
javob

Observability for AI — nimani o'lchash?

llm_tracking.py
from langfuse import Langfuse
from langfuse.decorators import observe

langfuse = Langfuse()

@observe()
async def rag_pipeline(question: str, tenant_id: str):
    # Avtomatik trace — har step log bo'ladi
    chunks = await retrieve(question)
    answer = await generate(question, chunks)
    return answer

@observe(as_type="generation")
async def generate(question: str, chunks: list):
    response = await client.messages.create(
        model="claude-opus-4-7",
        max_tokens=1000,
        messages=[...]
    )
    # Langfuse avtomatik: input, output, tokens, cost, latency
    return response.content[0].text

# User feedback
@observe()
async def rate_answer(trace_id: str, score: int, comment: str):
    langfuse.score(trace_id=trace_id, name="user_rating", value=score, comment=comment)

Cost optimization techniques

TexnikaTejashTradeoff
Prompt caching (Anthropic)90% arzon inputStatic prefix kerak
Batch API (OpenAI)50% arzon24h latency
Model routing60-80% tejashComplexity router
Response cachingVariableStale answers risk
Token-efficient prompts20-40%Yozish vaqti
Semantic caching30-50%Similar ≠ identical
model_router.py
"""Smart model router — oddiy savol'ga arzon, qiyin'ga qimmat model"""
import instructor
from pydantic import BaseModel

class Difficulty(BaseModel):
    level: Literal["simple", "medium", "complex"]
    reasoning: str

async def route_request(question: str) -> str:
    # Kichik tez model bilan difficulty classify
    diff = await classifier.messages.create(
        model="claude-haiku-4-5-20251001",
        response_model=Difficulty,
        messages=[{"role": "user", "content": 
                   f"Bu savol qanchalik qiyin?\n{question}"}]
    )
    
    if diff.level == "simple":
        # $0.25/1M input — eng arzon
        return await haiku_answer(question)
    elif diff.level == "medium":
        # $3/1M input — balanced
        return await sonnet_answer(question)
    else:
        # $15/1M input — eng aqlli
        return await opus_answer(question)

# Natijalar (1M so'rov):
#   Hamma Opus'da:  $15,000
#   Router bilan:    $3,200 (80% savol simple)
#   Saving: 80%

Safety — prompt injection, PII, content filtering

safety.py
import re

# 1. Prompt injection detection
INJECTION_PATTERNS = [
    r"ignore\s+(previous|all)\s+(instructions|prompts)",
    r"you\s+are\s+(now|actually)\s+",
    r"system\s*:\s*",
    r"<\s*system\s*>",
    r"jailbreak|DAN|developer mode",
]

def detect_injection(text: str) -> bool:
    text_lower = text.lower()
    return any(re.search(p, text_lower, re.IGNORECASE) for p in INJECTION_PATTERNS)

# 2. PII redaction (pre-LLM)
import presidio_analyzer
import presidio_anonymizer

analyzer = presidio_analyzer.AnalyzerEngine()
anonymizer = presidio_anonymizer.AnonymizerEngine()

def redact_pii(text: str) -> str:
    results = analyzer.analyze(text=text, language="en",
        entities=["CREDIT_CARD", "EMAIL_ADDRESS", "PHONE_NUMBER", "PERSON", "US_SSN"])
    anonymized = anonymizer.anonymize(text=text, analyzer_results=results)
    return anonymized.text

# 3. LLM output validation
from guardrails import Guard
from guardrails.hub import ValidJson, ToxicLanguage, DetectPII

guard = Guard().use_many(
    ValidJson(on_fail="exception"),
    ToxicLanguage(on_fail="filter"),
    DetectPII(on_fail="filter"),
)

async def safe_llm_call(prompt: str):
    # Input check
    if detect_injection(prompt):
        raise SecurityError("Prompt injection detected")
    
    redacted_prompt = redact_pii(prompt)
    response = await llm.generate(redacted_prompt)
    
    # Output validate
    validated = guard.parse(response)
    return validated.validated_output
EU AI Act — 2026 August compliance

August 2026'dan boshlab, high-risk AI system'lar EU'da majburiy documented, audited bo'lishi kerak. Technical documentation, risk assessment, human oversight, conformity certificate. Uzbek kompaniyalar ham EU mijozlariga xizmat qilayotganda tushishadi.

Part V

architecture decisions & trade-offs

Texnikalarni bilish — yarmi. Qachon qaysini tanlash — ikkinchi yarmi. Bu qismda tradeoff'lar, decision framework'lar va 2026 career roadmap.

17

architecture decisions & trade-offs

17.1

tradeoffs matrix — umumiy qarorlar

Texnologiya tanlashda qanday tradeoff'lar bor?
javob
Architect'ning asosiy ishi

Code yozish — dasturchi ishi. Arxitektor — "trade-off'lar menejeri". Har qaror cost, latency, complexity, reliability, flexibility o'rtasida. "Eng yaxshi" yo'q — "sizning konteksingiz uchun optimal" bor.

Monolith vs Microservice

AspectMonolithMicroservices
Development speed (start)TezSekin (infra setup)
Development speed (scale)SekinlashadiTez (parallel teams)
DeploymentBir button, bir riskIndependent, lekin orchestrate
DebuggingOson (bitta log)Qiyin (distributed tracing kerak)
Data consistencyACID transactionsSaga, eventual consistency
Network costIn-process (0)gRPC/HTTP overhead
Team size<20 dev50+ dev
When to useStartup, MVP, small teamScale, independent teams
Modular Monolith — to'g'ri yondashuv

Boshlang'ichda: modular monolith. Code'ni mantiqiy module'larga bo'ling (bounded contexts), lekin bitta deploy. Kerak bo'lganda module'ni alohida service qilib chiqarish oson. "Microservice first" — antipattern kichik jamoalar uchun.

SQL vs NoSQL

PostgreSQL (SQL) — default
  • ACID, relational integrity
  • Complex queries, JOINs
  • JSONB — kerak bo'lsa NoSQL
  • Full-text search built-in
  • Vector support (pgvector)
  • Mature, 40+ yosh ecosystem
NoSQL — specific cases
  • MongoDB: schema-flexible doc store
  • DynamoDB: massive scale key-value
  • Cassandra: write-heavy time-series
  • Redis: cache, pubsub, real-time
  • Neo4j: graph relationships

Rule of thumb 2026: PostgreSQL'dan boshlang. 90% SaaS muammolari unda hal bo'ladi. Limit'ga urilgandagina specialize qiling.

Sync vs Async

Use caseSyncAsync
REST API request✓ defaultFaqat I/O bound bo'lsa
Database queryOddiy, ishonchliKo'p concurrent request
File upload/downloadSekin, block✓ ishlatilishi shart
External API callsBir nechta seconds✓ paralel, tez
Email, SMSUser kutsin? Yo'q✓ queue'ga
Reports, exportsTimeout risk✓ background job
Real-time pushImpossible✓ WebSocket/SSE

Build vs Buy

Build qilish jonkuyar: - Core business logic (sizning raqobat afzalligingiz) - Unique requirements hech kim qo'llamayapti - Licensing restrictions Buy (yoki open-source) qilish jonkuyar: - Auth (Auth0, Clerk, Supabase Auth) - Analytics (PostHog, Amplitude) - Payments (Stripe, Paddle) - Email (Resend, Postmark) - Monitoring (Datadog, Grafana Cloud) - LLM hosting (managed API) - Vector DB (agar scale kerak bo'lsa) Rule: "Build differentiator, buy commodity"
17.2

backend engineer — learning roadmap

Qanday tartibda o'rganish kerak? Junior'dan senior'gacha yo'l.
javob

Level 1 — Foundation (0-12 oy)

Junior backend engineer

Tillar: 1 ta to'liq bilish (Python yoki TypeScript)
Framework: 1 ta production-grade (FastAPI, Express, Django)
Database: PostgreSQL fundamentals, SQL yaxshi
HTTP: REST API, JSON, status codes, cookies
Tools: Git, Docker basics, Linux commands
Testing: unit test yozish

Level 2 — Professional (1-3 yil)

Mid-level

DB deep: indexing, query tuning, transactions, locks
Caching: Redis patterns, cache invalidation
Async: Celery/queues, event-driven basics
Auth: OAuth, JWT, RBAC
Observability: logs, metrics, basic tracing
K8s basics: pod, deployment, service
CI/CD: pipeline yozish
AI basics: LLM API ishlatish, prompt engineering

Level 3 — Senior (3-6 yil)

Senior engineer

System design: mikroservis, event sourcing, CQRS
DB advanced: sharding, replication, multi-region
Distributed: consistency, CAP, Saga pattern
Performance: profiling, load testing
Security: threat modeling, zero trust
Cloud: AWS/GCP services deep
AI: RAG production, fine-tuning, agents
Leadership: mentoring, tech decisions, ADRs

Level 4 — Staff/Principal (6+ yil)

Staff+ engineer

Technical strategy: multi-year architecture vision
Cross-team impact: butun kompaniya darajasida
Business sense: tech qarorlarning biznes ta'siri
Specialized depth: biror bir sohada top expert (distributed systems, AI infrastructure, security, etc.)
Mentoring at scale: senior'larni yetishtirish

AI-specific track — zamonaviy yo'l

Backend engineer bo'lib tursangiz ham, AI exposure shart. Bu bo'limlarni alohida o'rganing:

  1. LLM fundamentals — qanday ishlaydi, tokens, context, cost
  2. RAG patterns — naive → hybrid → rerank → contextual
  3. Vector databases — pgvector'dan boshlang, keyin Qdrant
  4. Agent frameworks — LangGraph, CrewAI, raw function calling
  5. MCP protocol — Anthropic'ning universal tool protocol
  6. Fine-tuning basics — QLoRA, Ollama local deploy
  7. LLMOps — Langfuse, evaluation, cost monitoring

Resources — tavsiyalar

CategoryResource
Books"Designing Data-Intensive Applications" — Kleppmann (must)
Books"System Design Interview" — Alex Xu vol 1 & 2
Books"Building LLM Applications" — Chip Huyen (2024)
CoursesHigh Growth Engineer, ByteByteGo, DeepLearning.AI
BlogsMartin Fowler, Hussein Nasser, Pragmatic Engineer
PracticeGitHub open-source, Leetcode (algorithms), own projects
AI-specificAnthropic docs, Langfuse blog, HuggingFace courses
Yakuniy maslahat

AI backend'ni almashtirmaydi, lekin backend engineer'ni o'zgartiradi. Faqat CRUD API yozadigan dev — bozordan tushib ketadi. AI-native architect — biznes muammosini AI bilan hal qila biladigan muhandis — bo'sh vakansiyalar ko'payib boradi. Prompt yozish, RAG quvur qurish, agent orkestrovka — bular yangi "normal" ko'nikmalar.

17.3

umumiy tamoyillar — stayish qoidalari

yakun
Architect'ning yadrosi

Texnikalar o'zgaradi — tamoyillar qoladi. Bu yerda 2026-da hali ham haqiqiy qolgan principles.

  1. Simple > clever. Oddiy kod — oson debug, oson qayta yozish. Aqlli kod 6 oydan keyin sizning o'zingizga tushunarsiz.
  2. Boring technology wins. PostgreSQL 30 yoshda. Redis 15 yoshda. Ishlatilgan, ishonchli. Har yili yangi trend texnologiya — ehtiyot bo'ling.
  3. Premature optimization is the root of all evil. Measure first, optimize after. Lekin loyihani bosh'dan noto'g'ri qurmang — arxitektura muhim.
  4. Fail fast, fail loudly. Silent failure — eng xavfli. Xato — tez sezilib, alert bo'lsin.
  5. Idempotency is a superpower. Har operatsiya ikki marta bajarilsa ham xavfsiz bo'lsin. Retries, replays, chaos — hammasi oson bo'ladi.
  6. Observability before optimization. Qayerda sekin bo'layotganini bilmasdan optimize qilish — ko'z yumib otish.
  7. Security is not a feature. Har layer'da o'ylanishi kerak. Defense in depth.
  8. Cost is architecture. Cloud cost = design choice. $10k/oy'lik system $1k/oy'gacha tushirilishi mumkin (tahlil + re-architect).
  9. Write for humans. Kod boshqa odamlar uchun yoziladi. Keyinchalikchi siz — "boshqa odam".
  10. Tests are documentation. Yaxshi test — kod qanday ishlatilishini ko'rsatadi.
  11. Deletion is better than addition. Kamroq kod — kamroq bug, kamroq maintenance.
  12. YAGNI — You Aren't Gonna Need It. Kelajakdagi "ehtimol kerak bo'ladi" fichersi uchun kod yozmang.

"Any fool can write code that a computer can understand.
Good programmers write code that humans can understand."

— Martin Fowler

Part VI

system design patterns

Real interview va production'da uchrashtiladigan klassik system design muammolari. Har biri: requirements → scale estimation → HLD diagram → deep dive → trade-offs. URL shortener, chat, news feed, video streaming — bularni tushungan muhandis istalgan tizimni loyihalashi mumkin.

18

system design — interview & production patterns

18.1

url shortener — design bit.ly / tinyurl

Millionlab foydalanuvchi uchun URL qisqartirish xizmatini qanday loyihalash kerak?
javob
Nima uchun bu muammo muhim?

URL shortener — apparent sodda, lekin ichida murakkab muhandislik bor. 100:1 read/write nisbati — 1 URL qo'shiladi, 100 marta bosiladi. Sub-millisecond redirect kerak (user kutmaydi). 100 milliard+ URL saqlanishi mumkin. Bu muammo scalability, caching, hashing va consistency haqida hamma narsani sinovdan o'tkazadi.

Requirements va Scale Estimation

Functional requirements: - Uzun URL → qisqa URL (7 belgi, base62) - Qisqa URL → redirect (301 Permanent yoki 302 Found) - URL expiry (ixtiyoriy, masalan 30 kun) - Analytics (nechta marta bosildi) Non-functional: - 100M yangi URL/kun yaratish - 10B redirect/kun (100:1 read:write) - 99.99% uptime (SLA) - Redirect latency < 10ms (p99) Back-of-envelope: Writes: 100M/day = ~1,160/sec Reads: 10B/day = ~116,000/sec (peak 300k/sec) Storage: 7 bytes (key) + 200 bytes (URL) + metadata = ~500 bytes/URL 5 yil saqlasak: 100M * 365 * 5 * 500B = ~90TB Cache (20% hot URLs): ~18TB

High-Level Architecture

CLIENT Browser/App LOAD BALANCER Write API POST /shorten Read API GET /{shortCode} REDIS cache ~18TB NoSQL DB Cassandra~90TB Snowflake ID 64-bit unique ID Analytics Kafka → Flink Write flow: POST /shorten → ID generate → Base62 → DB + Cache Read flow: GET /abc123 → Cache hit (80%) → DB fallback → 301 redirect

Key Decision: Base62 encoding

Nima uchun Base62 va 7 ta belgi?

62 ta belgi (a-z, A-Z, 0-9) dan 7 ta pozitsiya bilan 62⁷ = 3.5 trillion kombinatsiya — 100 yilga yetarli (100M URL/kun hisobida). MD5/SHA hash ishlatsak — collision imkoniyati bor. Snowflake ID (Twitter-dan) — distributed, time-sorted, collision-free 64-bit integer → Base62 encode qilamiz. Bu yondashuv ham global unique, ham sortable (analytics uchun qulay).

url_shortener.py
import string
import time
import random

BASE62 = string.ascii_letters + string.digits  # 62 ta belgi

# Snowflake-inspired ID generator
class SnowflakeIDGenerator:
    EPOCH = 1700000000000          # ms, custom epoch
    DATACENTER_BITS = 5
    MACHINE_BITS = 5
    SEQUENCE_BITS = 12

    def __init__(self, datacenter_id: int, machine_id: int):
        self.datacenter_id = datacenter_id
        self.machine_id = machine_id
        self.sequence = 0
        self.last_timestamp = -1

    def next_id(self) -> int:
        ts = int(time.time() * 1000) - self.EPOCH
        if ts == self.last_timestamp:
            self.sequence = (self.sequence + 1) & 4095  # 12 bit
            if self.sequence == 0:
                while ts <= self.last_timestamp:
                    ts = int(time.time() * 1000) - self.EPOCH
        else:
            self.sequence = 0
        self.last_timestamp = ts

        # 41 bit ts | 5 bit datacenter | 5 bit machine | 12 bit seq
        return (ts << 22) | (self.datacenter_id << 17) | (self.machine_id << 12) | self.sequence

def encode_base62(num: int) -> str:
    """64-bit int → 7 ta Base62 belgi"""
    chars = []
    while num > 0:
        chars.append(BASE62[num % 62])
        num //= 62
    return ''.join(reversed(chars)).zfill(7)

def decode_base62(code: str) -> int:
    num = 0
    for c in code:
        num = num * 62 + BASE62.index(c)
    return num

# FastAPI endpoint
from fastapi import FastAPI, HTTPException
from fastapi.responses import RedirectResponse
import redis.asyncio as aioredis

app = FastAPI()
redis_client = aioredis.from_url("redis://localhost")
id_gen = SnowflakeIDGenerator(datacenter_id=1, machine_id=1)

@app.post("/shorten")
async def shorten(long_url: str, user_id: str = "anonymous", ttl_days: int = 30):
    # Rate limit check (token bucket from 1.3 bo'limi)
    allowed, remaining = await check_rate_limit(user_id)
    if not allowed:
        raise HTTPException(429, "Rate limit exceeded")

    short_id = encode_base62(id_gen.next_id())

    # Redis'da saqlash (primary fast path)
    await redis_client.setex(
        f"url:{short_id}",
        ttl_days * 86400,
        long_url
    )

    # DB'ga async yozish (Cassandra — eventual consistency OK)
    await db.insert_url(short_id=short_id, long_url=long_url,
                        user_id=user_id, created_at=time.time(),
                        expires_at=time.time() + ttl_days * 86400)

    return {"short_url": f"https://bit.ly/{short_id}", "short_id": short_id}

@app.get("/{short_id}")
async def redirect(short_id: str):
    # 1. Cache'dan qidirish (hit rate ~80%)
    long_url = await redis_client.get(f"url:{short_id}")

    if not long_url:
        # 2. DB'dan qidirish
        record = await db.get_url(short_id)
        if not record or record.expires_at < time.time():
            raise HTTPException(404, "URL not found or expired")
        long_url = record.long_url
        # Cache'ga qo'shish (cache-aside pattern)
        await redis_client.setex(f"url:{short_id}", 86400, long_url)

    # Analytics event async (Kafka)
    await kafka.send("url.clicked", {
        "short_id": short_id, "timestamp": time.time(),
        "user_agent": request.headers.get("user-agent"),
        "ip": request.client.host
    })

    # 301 Permanent (browser caches) yoki 302 Found (har safar serverga)
    # Analytics kerak bo'lsa 302, SEO uchun 301
    return RedirectResponse(url=long_url, status_code=302)

Trade-offs: 301 vs 302 Redirect

Aspekt301 Permanent302 Found
Browser caches?Ha — serverga keyingi so'rov yo'qYo'q — har safar serverga
Server loadJuda past (browser cache)Har click serverga keladi
AnalyticsImkonsiz (browser cache'lagan)Har click log'lanadi
URL o'zgarishi mumkinmi?Yo'q (cached forever)Ha (server qaytargan URL)
TavsiyaAnalytics yo'q, max performanceClick tracking kerak bo'lsa
Production pattern — Cassandra yoki DynamoDB

URL shortener uchun NoSQL to'g'ri tanlov: bitta pattern — short_idlong_url. JOIN kerak emas. Cassandra'ning short_id partition key'i bilan millisecond lookup, horizontally scalable, geo-distributed. DynamoDB'da single-table design bilan ham xuddi shunday.

18.2

chat system — design whatsapp / telegram

Real-time chat tizimini qanday loyihalash kerak — 1 milliard foydalanuvchi uchun?
javob
Chat nima uchun qiyin?

Oddiy HTTP request-response model ishlamaydi — server xabar kelganda clientga push qilishi kerak (server-initiated). Bu "bitta serverga barcha connection" muammosini keltirib chiqaradi. 1B user, har biri online = 1B WebSocket connection — bu bitta serverda imkonsiz, distributed bo'lishi kerak. Bundan tashqari: offline delivery (user o'chiq bo'lsa xabar saqlanadi), group chat (fanout — bitta xabar N ta odamga), end-to-end encryption, "typings", "read receipts" — bularning barchasi aslida murakkab distributed systems muammolari.

Protocol tanlash: WebSocket vs Long Polling vs SSE

ProtocolDirectionLatencyChat uchun
WebSocketBi-directional<10ms✅ Eng yaxshi — real-time, persistent
Long PollingUnidirectional (pull)100-500ms⚠️ Fallback faqat
SSEServer→Client only<50ms⚠️ Receive only, yuborish uchun HTTP kerak
HTTP/2 PushServer→Client<50ms⚠️ Browser-only, limited support
ALICE Mobile Chat Server 1 WS connections Chat Server 2 WS connections Kafka Message Queue User Presence Redis (online/offline) Cassandra Message history Push Service FCM / APNs BOB Mobile WS Alice → Chat Server 1 → Kafka → Delivery (Chat Server 2 yoki Push notification) Bob offline bo'lsa → Push (FCM/APNs). Online bo'lsa → WebSocket delivery

Message delivery qanday kafolatlanadi?

At-least-once delivery va duplicate handling

Network ishonchsiz — xabar yuborildi, lekin ACK kelmaydi. Client retry qiladi — xabar ikki marta yetib kelishi mumkin. Yechim: har xabarga client_message_id (UUID yoki timestamp+random) qo'shiladi. Server yoki recipient bu ID'ni ko'rgan bo'lsa, duplicate deb e'tiborsiz qoldiradi (idempotent delivery). WhatsApp dagi "bitta tick" = yuborildi serverga, "ikkita tick" = deliveredto recipient, "ko'k ikkita tick" = o'qildi.

chat_server.py
import asyncio
import json
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from datetime import datetime
import uuid

app = FastAPI()

# Connection registry — server ichida
# Production'da Redis Pub/Sub yoki Kafka bilan
class ConnectionManager:
    def __init__(self):
        self.active: dict[str, WebSocket] = {}  # user_id → ws

    async def connect(self, user_id: str, ws: WebSocket):
        await ws.accept()
        self.active[user_id] = ws
        # User online deb belgilash
        await redis.set(f"presence:{user_id}", "online", ex=30)

    def disconnect(self, user_id: str):
        self.active.pop(user_id, None)

    async def send_to(self, user_id: str, message: dict) -> bool:
        ws = self.active.get(user_id)
        if ws:
            try:
                await ws.send_json(message)
                return True
            except Exception:
                self.disconnect(user_id)
        return False

manager = ConnectionManager()

@app.websocket("/ws/{user_id}")
async def websocket_endpoint(ws: WebSocket, user_id: str):
    await manager.connect(user_id, ws)
    try:
        # Offline vaqtdagi xabarlarni yetkazish
        pending = await get_pending_messages(user_id)
        for msg in pending:
            await ws.send_json(msg)
            await mark_delivered(msg["id"])

        while True:
            data = await ws.receive_json()
            await handle_message(user_id, data)
    except WebSocketDisconnect:
        manager.disconnect(user_id)
        await redis.delete(f"presence:{user_id}")

async def handle_message(sender_id: str, data: dict):
    msg = {
        "id": str(uuid.uuid4()),
        "from": sender_id,
        "to": data["to"],
        "text": data["text"],
        "timestamp": datetime.utcnow().isoformat(),
        "status": "sent",
        "client_msg_id": data.get("client_msg_id"),  # dedup uchun
    }

    # 1. Cassandra'ga saqlash (message_history)
    await save_message(msg)

    # 2. Recipient onlinemi?
    is_online = await redis.get(f"presence:{msg['to']}")

    if is_online:
        # 3a. Direct delivery (agar bu serverda)
        delivered = await manager.send_to(msg["to"], msg)
        if not delivered:
            # 3b. Boshqa chat serverda — Kafka orqali
            await kafka.send("messages", msg)
    else:
        # 3c. Push notification
        await push_service.notify(
            user_id=msg["to"],
            title=f"Yangi xabar: {sender_id}",
            body=msg["text"][:50]
        )
18.3

news feed — design twitter/instagram feed

Social media feed'ni qanday loyihalash kerak? Fanout-on-write vs fanout-on-read?
javob
Feed nima uchun murakkab?

Elon Musk 170M follower'ga tweet qiladi. 170M odamning feed'ini yangilash kerak. Bitta tweet → 170M yozuv. Bu fanout muammosi — bir yozuvdan milliardlab natija. Instagram, Twitter yillarca shu muammoni hal qilishga harakat qildi. Yechim: hybrid approach — oddiy foydalanuvchilar uchun "fanout-on-write" (post qilganda hammaning feed'ini yangilaymiz), celebrity'lar uchun "fanout-on-read" (feed ochilganda celebrity postlarini pull qilamiz).

FANOUT-ON-WRITE (ordinary users, <10K followers) USER post qildi Fan-out Service Feed A Feed B Feed C Feed Redis'da tayyor (read juda tez) FANOUT-ON-READ (celebrities, 170M followers) CELEBRITY Elon Musk Post Store Cassandra USER feed ochdi Feed ochilganda celebrity postlar pull Write'da 170M yozuv yo'q! HYBRID: <10K followers → write fanout >10K followers → read fanout (celebrity threshold)
feed_service.py
CELEBRITY_THRESHOLD = 10_000  # follower soni

async def create_post(user_id: str, content: str) -> dict:
    post = await db.insert_post(user_id=user_id, content=content,
                                created_at=time.time())

    follower_count = await get_follower_count(user_id)

    if follower_count < CELEBRITY_THRESHOLD:
        # Fanout-on-write: barcha follower feed'ini yangilaymiz
        # Async Celery task orqali (HTTP request'ni bloklamaymiz)
        fanout_to_followers.delay(user_id=user_id, post_id=post.id)
    else:
        # Celebrity: faqat post saqlanadi, fan pull qiladi
        # Hech narsa qilmaymiz — read vaqtida merge qilamiz
        pass

    return post

async def get_feed(user_id: str, page: int = 1) -> list:
    # 1. User'ning precomputed feed'ini Redis'dan olamiz
    feed_key = f"feed:{user_id}"
    feed = await redis.lrange(feed_key, (page-1)*20, page*20-1)

    # 2. User follow qilgan celebrity'lar postlarini merge qilamiz
    celebrities = await get_followed_celebrities(user_id)
    celebrity_posts = []
    for celeb_id in celebrities:
        posts = await db.get_recent_posts(user_id=celeb_id, limit=10)
        celebrity_posts.extend(posts)

    # 3. Merge, deduplicate, sort by timestamp
    all_posts = list(feed) + celebrity_posts
    all_posts.sort(key=lambda p: p["created_at"], reverse=True)
    return all_posts[:20]

@celery_app.task
async def fanout_to_followers(user_id: str, post_id: str):
    """Background task: 1000 ta follower uchun ham tez ishlashi kerak"""
    followers = await get_followers_paginated(user_id, batch_size=100)
    pipeline = redis.pipeline()
    for follower_id in followers:
        # Redis list'ga prepend (lpush), max 200 post saqlash (ltrim)
        pipeline.lpush(f"feed:{follower_id}", post_id)
        pipeline.ltrim(f"feed:{follower_id}", 0, 199)
    await pipeline.execute()
18.4

consistent hashing — distributed data routing

Consistent hashing nima va Redis cluster, Cassandra, CDN'larda qanday ishlatiladi?
javob
Oddiy modulo hashing muammosi

4 ta server bor. Key uchun server: server = hash(key) % 4. Hammasi yaxshi — to'rtinchi server o'chdi. Endi: hash(key) % 3. Deyarli barcha key'lar boshqa serverlarga ko'chib ketadi — 75% cache miss, DB'ga hamma bir vaqtda hujum. Consistent hashing buni hal qiladi: server qo'shilganda yoki o'chirilganda faqat keylarning bir qismi ko'chadi.

S1 S2 90° S3 180° S4 270° K1 → S2 (keyingisi) K2 → S3 K3 → S1 S5 new S5 qo'shilsa: faqat S1→S5 oralig'i ko'chadi Consistent Hashing: Key hash → ring'da joylashadi Soat yo'nalishi bo'yicha keyingi server = owner Virtual Nodes (vnodes): Har server ring'da 150+ joyda Load balancing aniqroq bo'ladi
consistent_hash.py
import hashlib
from sortedcontainers import SortedDict

class ConsistentHashRing:
    def __init__(self, virtual_nodes: int = 150):
        self.virtual_nodes = virtual_nodes
        self.ring: SortedDict = SortedDict()
        self.servers: set = set()

    def _hash(self, key: str) -> int:
        return int(hashlib.md5(key.encode()).hexdigest(), 16)

    def add_server(self, server: str):
        self.servers.add(server)
        for i in range(self.virtual_nodes):
            vnode_key = self._hash(f"{server}:vnode:{i}")
            self.ring[vnode_key] = server

    def remove_server(self, server: str):
        self.servers.discard(server)
        for i in range(self.virtual_nodes):
            vnode_key = self._hash(f"{server}:vnode:{i}")
            self.ring.pop(vnode_key, None)

    def get_server(self, key: str) -> str | None:
        if not self.ring:
            return None
        key_hash = self._hash(key)
        # Soat yo'nalishi bo'yicha keyingi server
        idx = self.ring.bisect_left(key_hash)
        if idx == len(self.ring):
            idx = 0  # ring — aylana, oxiridan boshiga
        return self.ring.peekitem(idx)[1]

    def get_replica_servers(self, key: str, n: int = 3) -> list[str]:
        """N ta unique server (replication uchun)"""
        if not self.ring or len(self.servers) < n:
            return list(self.servers)

        key_hash = self._hash(key)
        idx = self.ring.bisect_left(key_hash)
        seen = set()
        result = []

        for _ in range(len(self.ring)):
            if idx >= len(self.ring):
                idx = 0
            server = self.ring.peekitem(idx)[1]
            if server not in seen:
                seen.add(server)
                result.append(server)
                if len(result) == n:
                    break
            idx += 1
        return result

# Ishlatish:
ring = ConsistentHashRing(virtual_nodes=150)
ring.add_server("cache-1:6379")
ring.add_server("cache-2:6379")
ring.add_server("cache-3:6379")

key = "user:12345:profile"
server = ring.get_server(key)  # "cache-2:6379"
replicas = ring.get_replica_servers(key, n=2)  # ha, replication uchun
Real production'da ishlatiladi

Apache Cassandra — consistent hashing + virtual nodes asosida data distribution. Redis Cluster — 16384 hash slot (bir xil g'oya, lekin slot-based). Amazon DynamoDB — ichida consistent hashing. Nginx/HAProxy upstream hashing — session affinity uchun.

19

ai architecture — advanced patterns

19.1

graphrag — knowledge graph + rag

Oddiy RAG nima uchun yetmaydi va GraphRAG qanday muammoni hal qiladi?
javob
Oddiy RAG'ning eng katta muammosi — multi-hop reasoning

Savol: "Yangi soliq qonuni bizning Yevropa ta'minot zanjirimizga qanday ta'sir qiladi?" — bu savolga javob 50 ta turli hujjatda tarqalgan: soliq qonuni hujjati, Yevropa shartnomalar, ta'minot zanjiri ma'lumotlari, logistika narxlari, risklar... Oddiy RAG eng o'xshash 5 ta chunk'ni topadi — ammo bu savolga javob berish uchun ularni bog'lab, sabab-natija zanjirini tushunish kerak. Vektor o'xshashligi bundan ojiz.

GraphRAG Microsoft tomonidan 2024-yilda open-source qilingan. G'oya: matnlardan entity va munosabatlar grafini qurish (Apple → CEO → Tim Cook, Tim Cook → born_in → Alabama). Savol kelganda vector search + graph traversal qo'shiladi — "multi-hop" reasoning mumkin bo'ladi.

Oddiy RAG vs GraphRAG — qachon qaysi?

Oddiy RAG yetarli: savol to'g'ridan-to'g'ri javobli ("ushbu shartnomada narx qancha?"), chunklarda mustaqil javob bor. GraphRAG kerak: munosabatlar muhim ("kompaniya kim bilan hamkor, bu hamkor kimga sotadi, u kim bilan bog'liq?"), "taqqoslash" savollari ("A va B kompaniya strategiyalari qanday farq qiladi?"), hierarchical summarization (butun corpus haqida savol).

KNOWLEDGE GRAPH — Entities + Relationships Apple Inc. COMPANY Tim Cook PERSON/CEO TSMC SUPPLIER iPhone 17 PRODUCT A18 Pro Chip COMPONENT Taiwan LOCATION CEO_OF makes contains made_by in supplies_to Multi-hop query misoli: Savol: "Apple'ning asosiy chip yetkazib beruvchi qaysi mamlakatda va u yerda siyosiy risklar bormi?" Graf traversal: Apple → makes → iPhone → contains → A18 Pro → made_by → TSMC → in → Taiwan Keyin: Taiwan + "siyosiy risk" hujjatlarini Vector Search bilan birlashtirish → to'liq javob Oddiy RAG shu multi-hop zanjirni o'z-o'zicha topa olmaydi!
graphrag_pipeline.py
"""
GraphRAG pipeline: Document → Entity extraction → Graph → Hybrid retrieval
"""
from anthropic import AsyncAnthropic
from neo4j import AsyncGraphDatabase
import asyncio

client = AsyncAnthropic()

# ── 1. INDEXING: Entity va munosabatlarni chiqarib olish ──
EXTRACTION_PROMPT = """
Quyidagi matndan entity va munosabatlarni JSON formatda chiqar.

Format:
{
  "entities": [{"id": "apple", "type": "COMPANY", "name": "Apple Inc.", "props": {...}}],
  "relationships": [{"from": "tim_cook", "to": "apple", "type": "CEO_OF", "props": {...}}]
}

Faqat JSON qaytargin, boshqa narsa yo'q.

Matn: {text}
"""

async def extract_entities(text: str) -> dict:
    response = await client.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=2000,
        messages=[{"role": "user", "content": EXTRACTION_PROMPT.format(text=text)}]
    )
    import json
    return json.loads(response.content[0].text)

async def index_document(doc_text: str, doc_id: str, neo4j_session):
    """Hujjatni graph'ga qo'shish"""
    extracted = await extract_entities(doc_text)

    # Neo4j'ga entity'lar qo'shish
    for entity in extracted["entities"]:
        await neo4j_session.run(
            "MERGE (e:Entity {id: $id}) SET e += $props SET e:$type",
            id=entity["id"], props=entity.get("props", {}), type=entity["type"]
        )

    # Munosabatlar qo'shish
    for rel in extracted["relationships"]:
        await neo4j_session.run(
            """
            MATCH (a:Entity {id: $from_id}), (b:Entity {id: $to_id})
            MERGE (a)-[r:RELATED {type: $rel_type}]->(b)
            SET r += $props
            """,
            from_id=rel["from"], to_id=rel["to"],
            rel_type=rel["type"], props=rel.get("props", {})
        )

# ── 2. QUERY: Hybrid Graph + Vector retrieval ──
async def hybrid_graphrag_query(question: str, neo4j_session, qdrant) -> str:
    # a) Savoldan entity'larni chiqar
    entities_in_q = await extract_entities(question)
    entity_ids = [e["id"] for e in entities_in_q.get("entities", [])]

    # b) Graph traversal — entity'lardan bog'liq tugunlarni topish (2 hop)
    graph_context = []
    if entity_ids:
        result = await neo4j_session.run(
            """
            MATCH (e:Entity)-[r*1..2]-(related:Entity)
            WHERE e.id IN $entity_ids
            RETURN e, r, related LIMIT 50
            """,
            entity_ids=entity_ids
        )
        graph_context = [record.data() async for record in result]

    # c) Vector search — semantic o'xshash chunk'lar
    q_embedding = await get_embedding(question)
    vector_results = await qdrant.search(
        collection_name="documents",
        query_vector=q_embedding,
        limit=5
    )

    # d) Ikkalasini birlashtirish va LLM'ga berish
    context = f"""
Graph Context (Munosabatlar):
{format_graph_context(graph_context)}

Vector Context (O'xshash hujjatlar):
{chr(10).join(r.payload['text'] for r in vector_results)}
"""

    response = await client.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=1500,
        system="Siz berilgan context asosida aniq va to'liq javob beradigan yordamchisiz.",
        messages=[{"role": "user", "content": f"Context:\n{context}\n\nSavol: {question}"}]
    )
    return response.content[0].text
AspektNaive RAGGraphRAG
Multi-hop reasoning❌ Yo'q✅ Graf traversal
Entity relationships❌ Chunk'lar izolyatsiyada✅ Explicit edges
Hallucination riskYuqoriPastroq (traced sources)
Setup murakkabligiSoddaYuqori (Neo4j + extraction)
Indexing narxiPastYuqori (LLM call per doc)
Qachon tanlashSingle-hop QA, docs searchComplex relationships, compliance, supply chain
19.2

ai memory systems — short, long, episodic

AI agent qanday "eslab qoladi"? Memory architecture qanday quriladi?
javob
LLM stateless — u hech narsani eslamaydi

Har yangi API call — yangi boshlanish. LLM oldingi conversation'ni bilmaydi (context window'ga qo'shmasangiz). Bu agentic AI'ning eng katta muammosi: agent kun bo'yi ishlab, kechqurun sessiya yopilsa — ertasi kuni hammasi unutiladi. AI Memory — agentga "xotira" berish tizimi. Inson xotirasi kabi 3 qavatli: qisqa muddatli (session), uzoq muddatli (facts, preferences), epizodik (nimalar qilgani).

AI AGENT LLM Brain Short-term Context Window Conversation history Current task context 128K tokens → RAM Long-term Vector DB (Qdrant) User preferences Learned facts Skills acquired Episodic Action History What agent did Outcomes, errors Kafka/PostgreSQL Semantic Knowledge Graph World knowledge Domain expertise Neo4j/GraphRAG
agent_memory.py
"""
Production AI Memory System — 4 xotira turi
"""
from qdrant_client import AsyncQdrantClient
from datetime import datetime
import json

class AgentMemory:
    def __init__(self, agent_id: str):
        self.agent_id = agent_id
        self.qdrant = AsyncQdrantClient("localhost", port=6333)
        self.short_term: list = []      # joriy session context
        self.max_short_term = 20        # max 20 ta turn

    # ── Short-term: conversation context ──
    def add_to_short_term(self, role: str, content: str):
        self.short_term.append({"role": role, "content": content})
        # Eski xabarlarni o'chirish (token limit uchun)
        if len(self.short_term) > self.max_short_term:
            # Birinchi ikki (system) ni qoldirib, engeski'larni o'chiramiz
            self.short_term = self.short_term[:2] + self.short_term[-self.max_short_term+2:]

    def get_short_term_context(self) -> list:
        return self.short_term.copy()

    # ── Long-term: vector DB'da saqlash ──
    async def remember(self, content: str, memory_type: str, metadata: dict = {}):
        """Muhim ma'lumotni uzoq muddatli xotiraga yozish"""
        embedding = await get_embedding(content)
        await self.qdrant.upsert(
            collection_name="agent_memory",
            points=[{
                "id": generate_uuid(),
                "vector": embedding,
                "payload": {
                    "agent_id": self.agent_id,
                    "content": content,
                    "type": memory_type,  # "fact", "preference", "skill"
                    "created_at": datetime.utcnow().isoformat(),
                    **metadata
                }
            }]
        )

    async def recall(self, query: str, memory_type: str = None, limit: int = 5) -> list:
        """Semantic qidirish — eng relevant xotiralarni qaytarish"""
        q_embedding = await get_embedding(query)
        filter_condition = {"must": [{"key": "agent_id", "match": {"value": self.agent_id}}]}
        if memory_type:
            filter_condition["must"].append({"key": "type", "match": {"value": memory_type}})

        results = await self.qdrant.search(
            collection_name="agent_memory",
            query_vector=q_embedding,
            query_filter=filter_condition,
            limit=limit,
            with_payload=True
        )
        return [r.payload for r in results]

    # ── Episodic: nima qilganini yozish ──
    async def log_action(self, action: str, result: str, success: bool):
        """Agent qilgan har bir ish yoziladi"""
        await db.insert({
            "agent_id": self.agent_id,
            "action": action,
            "result": result,
            "success": success,
            "timestamp": datetime.utcnow().isoformat()
        })
        # Xatoni uzoq muddatli xotiraga ham yozamiz (keyingi safar qaytarmaslik uchun)
        if not success:
            await self.remember(
                f"Xato: {action} → {result}",
                memory_type="error",
                metadata={"avoid_repeat": True}
            )

    # ── Full context builder ──
    async def build_context(self, current_query: str) -> str:
        """Agent uchun to'liq kontekst yig'ish"""
        # 1. Relevant uzoq muddatli xotiralar
        long_term = await self.recall(current_query, limit=3)
        # 2. Relevant xatolar (bir xil xatoni qaytarmasin)
        errors = await self.recall(current_query, memory_type="error", limit=2)

        context_parts = []
        if long_term:
            context_parts.append("Oldingi tajribalarim:\n" +
                "\n".join(f"- {m['content']}" for m in long_term))
        if errors:
            context_parts.append("Ilgari qilgan xatolar (takrorlamang):\n" +
                "\n".join(f"- {e['content']}" for e in errors))

        return "\n\n".join(context_parts) if context_parts else ""

# Ishlatish misoli:
memory = AgentMemory(agent_id="agent-001")

# Yangi ma'lumot o'rganildi
await memory.remember(
    "Foydalanuvchi Python'ni afzal ko'radi, JavaScript'dan qochadi",
    memory_type="preference"
)

# Vazifa bajarildi
await memory.log_action(
    action="Write unit tests for auth module",
    result="12 ta test yozildi, 11 ta pass, 1 ta fail (JWT expiry edge case)",
    success=False
)

# Yangi savol keldi — contextni yig'amiz
context = await memory.build_context("Auth module'da yangi feature qo'shish kerak")
# context ichida: preference (Python), xato (JWT expiry) bor
19.3

slm, edge ai va mixture of experts (moe)

Hamma GPT-4 ishlatish shart emas — SLM va MoE qanday ishlaydi?
javob
Katta model = har doim yaxshi emas

2024-2025'da muhim o'zgarish: "bigger is better" dogmasi buzildi. Phi-4 (14B) — Microsoft SLM, GPT-4 darajasida benchmark'larda, lekin 10x kichik. Gemma 3 (4B) — Google, smartphone'da ishlaydi. Llama 3.2 (1B-3B) — edge device'larda. Nega muhim? Kichik model: privacy (data server'ga chiqmaydi), latency (local = sub-50ms), cost (API narxi yo'q), offline (internet kerak emas).

MoE — Mixture of Experts — GPT-4 va Mixtral'ning siri

Klassik transformer: har token barcha 70B parametrdan o'tadi. MoE g'oyasi: 8 ta "expert" model bor (har biri 7B), har token uchun faqat 2 ta eng relevant expert aktivlashadi. Natijada: 56B umumiy parametr, lekin har qadamda faqat 14B ishlatiladi. GPT-4 ham MoE arxitekturasi (rasmiy tasdiqlanmagan, lekin keng ishoniladi). Mixtral 8x7B — birinchi open-source MoE, 56B param lekin 14B "active". Inference narxi 4x past, accuracy yaxshi.

TOKEN "yazish" ROUTER Gating Network top-2 expert tanlaydi Expert 1 Code/Logic Expert 2 Language Expert 3 ✓ Creative/UZ lang Expert 4 ✓ Translation Expert 5-8 (inactive) ↑ Faqat 2 ta activated MERGE weighted sum OUTPUT next token Mixtral 8x7B Total: 56B param Active: 14B param Speed: 4x Llama2-70B Quality: ≈ GPT-3.5 Cost: 4x cheaper License: Apache 2.0

Model tanlash guide

ModelParamsContextUse caseNarx (1M tok)
Claude Opus 4.6~200B+200KComplex reasoning, agentic$15 input/$75 output
Claude Sonnet 4.6~70B200KProduction workloads (best value)$3/$15
GPT-4o mini~8B MoE128KSimple tasks, high volume$0.15/$0.60
Llama 3.3 (70B)70B128KSelf-hosted, privacy requiredInfra narxi
Phi-4 (14B)14B16KEdge, coding tasks, fastBepul (local)
Gemma 3 (4B)4B128KMobile, IoT, ultra-low latencyBepul (local)
Mixtral 8x22B141B/39B active64KSelf-host, high quality, MoEInfra narxi

Smart Model Router — 80% narxni tejash

Har savol ham GPT-4'ni talab qilmaydi

"Salom qanday?" — Phi-4 bilan javob bera oladi (0.001$). "Butun codebase'ni tahlil qilib bug topib ber" — Claude Opus kerak (0.50$). Savol murakkabligiga qarab model tanlash: classifier (kichik model) savolni baholaydi → routing qaroriga qarab mos model'ga yuboriladi. Production'da 80% savollar "easy" — kichik model bilan yopiladi, faqat 20% "hard" savollar katta modelga ketadi.

smart_router.py
from anthropic import AsyncAnthropic
from openai import AsyncOpenAI
import re

client = AsyncAnthropic()
openai_client = AsyncOpenAI()

DIFFICULTY_CLASSIFIER_PROMPT = """
Quyidagi savolning murakkabligini baholang. Faqat JSON qaytaring:
{
  "difficulty": "easy" | "medium" | "hard",
  "reasoning": "bir jumlada sabab",
  "requires_code": true/false,
  "requires_reasoning": true/false
}

Easy: oddiy savol, faktual, qisqa javob
Medium: tushuntirish kerak, bir necha qadam
Hard: kompleks reasoning, katta kontekst, kod tahlil, multi-step

Savol: {question}
"""

async def classify_difficulty(question: str) -> dict:
    """Kichik model bilan savolni baholash"""
    import json
    response = await openai_client.chat.completions.create(
        model="gpt-4o-mini",   # arzon classifier
        messages=[{"role": "user",
                   "content": DIFFICULTY_CLASSIFIER_PROMPT.format(question=question)}],
        max_tokens=100,
        temperature=0
    )
    return json.loads(response.choices[0].message.content)

async def smart_route(question: str, context: str = "") -> tuple[str, str]:
    """
    Returns: (answer, model_used)
    """
    classification = await classify_difficulty(question)
    difficulty = classification["difficulty"]
    needs_reasoning = classification.get("requires_reasoning", False)
    needs_code = classification.get("requires_code", False)

    # Routing mantiq
    if difficulty == "easy" and not needs_reasoning:
        # Eng arzon: GPT-4o mini ($0.15/1M tokens)
        model = "gpt-4o-mini"
        response = await openai_client.chat.completions.create(
            model=model,
            messages=[{"role": "user", "content": question}],
            max_tokens=500
        )
        answer = response.choices[0].message.content

    elif difficulty == "medium" or needs_code:
        # O'rta narx: Claude Sonnet ($3/1M tokens)
        model = "claude-sonnet-4-6"
        response = await client.messages.create(
            model=model,
            max_tokens=1500,
            messages=[{"role": "user", "content": question}]
        )
        answer = response.content[0].text

    else:  # hard yoki complex reasoning
        # Eng qimmat: Claude Opus ($15/1M tokens) — faqat kerak bo'lganda
        model = "claude-opus-4-6"
        response = await client.messages.create(
            model=model,
            max_tokens=4000,
            system="Siz ekspert muhandissiz. Kompleks muammolarni chuqur tahlil qiling.",
            messages=[{"role": "user", "content": f"{context}\n\n{question}" if context else question}]
        )
        answer = response.content[0].text

    # Cost tracking (Langfuse yoki custom)
    await track_usage(model=model, tokens_used=estimate_tokens(question + answer),
                      difficulty=difficulty)

    return answer, model

# Natijalar:
# 80% savollar → gpt-4o-mini (0.15$/1M)
# 15% → claude-sonnet (3$/1M)
# 5% → claude-opus (15$/1M)
# O'rtacha narx: ~0.6$/1M (opus-only vs 15x tejash)
AI engineering asosiy tamoyili

Har savol uchun eng katta modalni ishlatish — pul yo'qotish. Smart routing + caching + SLM-for-simple-tasks kombinatsiyasi bilan 80-90% narx tejash mumkin, foydalanuvchi sifati yo'qolmaydi. Bu eng muhim LLMOps pattern'i.

Part VIII

software architecture patterns

Design patterns, microservices, trade-off'lar va muhandislik tamoyillari. Kod yozish — muhandislikning kichik bir qismi. Katta tizimlarni loyihalash, refactor qilish, jamoa bilan ishlash — bular professional muhandisning asosiy mahorati.

20

design patterns — production'da ishlatiluvchi

20.1

creational patterns — ob'ekt yaratish

Singleton, Factory, Builder — qachon va qanday ishlatiladi?
javob
Design pattern nima va nima uchun kerak?

Design pattern — takror uchraidigan muammolarga isbotlangan yechimlar. Ixtiro qilish shart emas — 1994-yilda "Gang of Four" 23 ta klassik pattern'ni hujjatlashtirishdi. Bularni bilish: boshqalar yozgan kodni tushunish (masalan, "bu Factory pattern"), muloqotni qisqartirish ("Singleton ishlataylik" — uzun tushuntirish kerak emas), yaxshi arxitektura tanlash.

Singleton — bitta nusxa

Singleton qachon kerak?

Tizimda faqat bitta nusxa bo'lishi kerak bo'lgan narsalar: DB connection pool, konfigurasiya, logging instance. Python'da oddiy — module-level variable avtomatik singleton. Thread-safe singleton uchun esa ehtiyotkorlik kerak.

patterns/singleton.py
from functools import lru_cache
from threading import Lock

# ── Python'da eng sodda Singleton — module import ──
# config.py
class _Config:
    def __init__(self):
        self.db_url = "postgres://..."
        self.debug = False

_instance = _Config()  # Module import'da bir marta yaratiladi

def get_config() -> _Config:
    return _instance  # Har doim bir xil obyekt

# ── Thread-safe Singleton (generic) ──
class Singleton:
    _instance = None
    _lock: Lock = Lock()

    def __new__(cls):
        with cls._lock:
            if not cls._instance:
                cls._instance = super().__new__(cls)
        return cls._instance

# ── FastAPI'da: lru_cache bilan ──
from pydantic_settings import BaseSettings

class Settings(BaseSettings):
    db_url: str
    api_key: str

@lru_cache          # Bir marta ishlaydi, natijani cache'laydi = Singleton
def get_settings() -> Settings:
    return Settings()

# ── Singleton anti-pattern: global state testing'ni qiyinlashtiradi ──
# Testing uchun yaxshiroq: Dependency Injection
# class MyService:
#     def __init__(self, config: Config):  # inject
#         self.config = config

Factory Pattern — ob'ekt yaratishni markazlashtirish

Factory qachon kerak?

"Qaysi konkret sinf kerak" qarorini foydalanuvchidan yashirish. Masalan: payment provider tanlash — Stripe, Payme, Click — har biri alohida implementatsiya, lekin bir xil interfeys. Client faqat PaymentFactory.create("payme") deydi, ichida qaysi sinfni ishlatishni Factory hal qiladi.

patterns/factory.py
from abc import ABC, abstractmethod

# ── Abstract Interface ──
class PaymentProvider(ABC):
    @abstractmethod
    async def charge(self, amount: float, currency: str, card_token: str) -> dict:
        ...

    @abstractmethod
    async def refund(self, payment_id: str, amount: float) -> dict:
        ...

# ── Concrete implementations ──
class StripeProvider(PaymentProvider):
    def __init__(self, api_key: str):
        self.stripe = Stripe(api_key)

    async def charge(self, amount, currency, card_token):
        return await self.stripe.payment_intents.create(
            amount=int(amount * 100),
            currency=currency,
            payment_method=card_token,
        )

    async def refund(self, payment_id, amount):
        return await self.stripe.refunds.create(payment_intent=payment_id)

class PaymeProvider(PaymentProvider):
    async def charge(self, amount, currency, card_token):
        # Payme API logikasi
        ...

    async def refund(self, payment_id, amount):
        ...

# ── Factory ──
class PaymentFactory:
    _providers: dict[str, type[PaymentProvider]] = {
        "stripe": StripeProvider,
        "payme": PaymeProvider,
    }

    @classmethod
    def create(cls, provider_name: str, **kwargs) -> PaymentProvider:
        if provider_name not in cls._providers:
            raise ValueError(f"Unknown provider: {provider_name}. "
                           f"Available: {list(cls._providers)}")
        return cls._providers[provider_name](**kwargs)

    @classmethod
    def register(cls, name: str, provider_class: type[PaymentProvider]):
        """Yangi provider qo'shish (Open/Closed principle)"""
        cls._providers[name] = provider_class

# ── Ishlatish ──
provider = PaymentFactory.create("stripe", api_key=settings.STRIPE_KEY)
result = await provider.charge(99.99, "USD", card_token)

# ── Konfiguratsiyadan ──
provider_name = settings.PAYMENT_PROVIDER  # "stripe" yoki "payme"
provider = PaymentFactory.create(provider_name, **settings.payment_config)

Builder Pattern — murakkab ob'ektlar

patterns/builder.py
from dataclasses import dataclass, field
from typing import Self

# ── SQL Query Builder ──
class QueryBuilder:
    """Method chaining bilan SQL query yaratish"""

    def __init__(self, table: str):
        self._table = table
        self._conditions: list[str] = []
        self._columns: list[str] = ["*"]
        self._limit: int | None = None
        self._offset: int = 0
        self._order: str | None = None
        self._params: list = []

    def select(self, *columns: str) -> Self:
        self._columns = list(columns)
        return self

    def where(self, condition: str, *values) -> Self:
        param_start = len(self._params) + 1
        self._conditions.append(
            condition.replace("?", f"${param_start}")
        )
        self._params.extend(values)
        return self

    def limit(self, n: int) -> Self:
        self._limit = n
        return self

    def offset(self, n: int) -> Self:
        self._offset = n
        return self

    def order_by(self, column: str, direction: str = "ASC") -> Self:
        self._order = f"{column} {direction}"
        return self

    def build(self) -> tuple[str, list]:
        cols = ", ".join(self._columns)
        sql = f"SELECT {cols} FROM {self._table}"
        if self._conditions:
            sql += " WHERE " + " AND ".join(self._conditions)
        if self._order:
            sql += f" ORDER BY {self._order}"
        if self._limit:
            sql += f" LIMIT {self._limit}"
        if self._offset:
            sql += f" OFFSET {self._offset}"
        return sql, self._params

# ── Ishlatish ──
query, params = (
    QueryBuilder("orders")
    .select("id", "status", "total")
    .where("tenant_id = ?", tenant_id)
    .where("status = ?", "active")
    .order_by("created_at", "DESC")
    .limit(20)
    .offset(40)
    .build()
)
# SELECT id, status, total FROM orders
# WHERE tenant_id = $1 AND status = $2
# ORDER BY created_at DESC LIMIT 20 OFFSET 40
20.2

structural & behavioral patterns — zamonaviy

Repository, Strategy, Observer, Decorator — backend'da qanday qo'llanadi?
javob

Repository Pattern — DB logikasini ajratish

Repository nima uchun muhim?

Service layer to'g'ridan-to'g'ri DB bilan gapirsa — test yozish qiyin (real DB kerak), DB o'zgarsa (PostgreSQL → MongoDB) — butun service qayta yoziladi. Repository — ma'lumotlar bilan ishlash logikasini alohida qatlam. Service faqat nima istashini aytadi, Repository qanday olishni biladi.

patterns/repository.py
from abc import ABC, abstractmethod

# ── Abstract interface ──
class OrderRepository(ABC):
    @abstractmethod
    async def get_by_id(self, order_id: str) -> Order | None: ...
    @abstractmethod
    async def get_by_tenant(self, tenant_id: str, limit: int, offset: int) -> list[Order]: ...
    @abstractmethod
    async def create(self, data: OrderCreate, user_id: str) -> Order: ...
    @abstractmethod
    async def update_status(self, order_id: str, status: str) -> Order: ...

# ── PostgreSQL implementatsiyasi ──
class PostgresOrderRepository(OrderRepository):
    def __init__(self, db):
        self.db = db

    async def get_by_id(self, order_id: str) -> Order | None:
        row = await self.db.fetchrow(
            "SELECT * FROM orders WHERE id = $1", order_id
        )
        return Order(**dict(row)) if row else None

    async def get_by_tenant(self, tenant_id, limit, offset) -> list[Order]:
        rows = await self.db.fetch(
            """SELECT * FROM orders WHERE tenant_id = $1
               ORDER BY created_at DESC LIMIT $2 OFFSET $3""",
            tenant_id, limit, offset
        )
        return [Order(**dict(r)) for r in rows]

    async def create(self, data: OrderCreate, user_id: str) -> Order:
        row = await self.db.fetchrow(
            """INSERT INTO orders (id, user_id, product_id, quantity, status)
               VALUES (gen_random_uuid(), $1, $2, $3, 'pending')
               RETURNING *""",
            user_id, data.product_id, data.quantity
        )
        return Order(**dict(row))

    async def update_status(self, order_id: str, status: str) -> Order:
        row = await self.db.fetchrow(
            "UPDATE orders SET status=$1, updated_at=NOW() WHERE id=$2 RETURNING *",
            status, order_id
        )
        return Order(**dict(row))

# ── In-memory implementatsiyasi (testing uchun!) ──
class InMemoryOrderRepository(OrderRepository):
    def __init__(self):
        self._store: dict[str, Order] = {}

    async def get_by_id(self, order_id: str) -> Order | None:
        return self._store.get(order_id)

    async def create(self, data: OrderCreate, user_id: str) -> Order:
        order = Order(id=str(uuid.uuid4()), user_id=user_id, **data.dict())
        self._store[order.id] = order
        return order

# ── Service — Repository inject qilinadi ──
class OrderService:
    def __init__(self, repo: OrderRepository):  # Abstract type!
        self.repo = repo

    async def create_order(self, data: OrderCreate, user: User) -> Order:
        # Biznes logikasi faqat shu yerda
        if data.quantity > 100:
            raise ValueError("Maximum 100 ta buyurtma")
        order = await self.repo.create(data, user.id)
        await self.notify_warehouse(order)
        return order

# Dependency injection:
# Production: OrderService(PostgresOrderRepository(db))
# Test:       OrderService(InMemoryOrderRepository())

Strategy Pattern — algoritm almashtirish

patterns/strategy.py
from abc import ABC, abstractmethod

# ── Notification strategy ──
class NotificationStrategy(ABC):
    @abstractmethod
    async def send(self, user: User, message: str) -> bool: ...

class EmailStrategy(NotificationStrategy):
    async def send(self, user, message):
        await sendgrid.send(to=user.email, body=message)
        return True

class SMSStrategy(NotificationStrategy):
    async def send(self, user, message):
        await twilio.send(to=user.phone, body=message[:160])
        return True

class PushStrategy(NotificationStrategy):
    async def send(self, user, message):
        await fcm.send(token=user.device_token, body=message)
        return True

class MultiStrategy(NotificationStrategy):
    """Bir nechta kanal — fallback bilan"""
    def __init__(self, strategies: list[NotificationStrategy]):
        self.strategies = strategies

    async def send(self, user, message):
        for strategy in self.strategies:
            try:
                success = await strategy.send(user, message)
                if success:
                    return True
            except Exception:
                continue  # Keyingi kanalga o'tish
        return False

# ── Notification service — strategy inject ──
class NotificationService:
    def __init__(self, strategy: NotificationStrategy):
        self.strategy = strategy

    async def notify(self, user: User, message: str):
        return await self.strategy.send(user, message)

    def set_strategy(self, strategy: NotificationStrategy):
        self.strategy = strategy  # Runtime'da almashtirish

# ── Konfiguratsiyadan strategy tanlash ──
STRATEGIES = {
    "email": EmailStrategy(),
    "sms": SMSStrategy(),
    "push": PushStrategy(),
    "multi": MultiStrategy([PushStrategy(), EmailStrategy()]),
}

notifier = NotificationService(STRATEGIES[settings.NOTIFICATION_CHANNEL])

Observer Pattern — event system

patterns/observer.py
import asyncio
from collections import defaultdict

# ── Simple async event bus ──
class EventBus:
    _instance = None
    _handlers: dict[str, list] = defaultdict(list)

    @classmethod
    def subscribe(cls, event: str, handler):
        """Event'ga handler qo'shish"""
        cls._handlers[event].append(handler)

    @classmethod
    async def publish(cls, event: str, data: dict):
        """Event chiqarish — barcha handler'larni chaqirish"""
        handlers = cls._handlers.get(event, [])
        if not handlers:
            return
        # Barcha handler'lar parallel ishlaydi
        await asyncio.gather(
            *[handler(data) for handler in handlers],
            return_exceptions=True  # Bir handler xatosi boshqasini to'xtatmasin
        )

# ── Handler'lar ──
async def send_order_confirmation(data: dict):
    await email_service.send(
        to=data["email"],
        subject=f"Order #{data['order_id']} confirmed",
        template="order_confirmation"
    )

async def update_inventory(data: dict):
    await inventory_service.decrease(
        product_id=data["product_id"],
        quantity=data["quantity"]
    )

async def notify_warehouse(data: dict):
    await warehouse_api.post("/new-order", json=data)

# ── Subscribe ──
EventBus.subscribe("order.created", send_order_confirmation)
EventBus.subscribe("order.created", update_inventory)
EventBus.subscribe("order.created", notify_warehouse)

# ── Publish ──
@router.post("/orders")
async def create_order(data: OrderCreate, user: User = Depends(get_current_user)):
    order = await order_service.create(data, user)

    # Event chiqarish — handler'lar parallel ishlaydi
    await EventBus.publish("order.created", {
        "order_id": str(order.id),
        "email": user.email,
        "product_id": str(data.product_id),
        "quantity": data.quantity,
    })

    return order
21

microservices — arxitektura va patterns

21.1

microservices — qachon va qanday bo'linadi?

Monolith qachon microservices'ga aylantiriladi va qanday bo'linadi?
javob
Eng ko'p qilinadigan xato — erta microservices

Deyarli hamma startup microservices bilan boshlaydi — bu xato. Microservices tarqatilgan tizim murakkabliklarini olib keladi: service discovery, network latency, distributed tracing, data consistency. 10 ta developer'gacha bo'lgan jamoa uchun yaxshi arxitekturalangan monolith ko'pincha yaxshiroq. Amazon'ning o'zi ham 2002-yilda monolith'da boshlagan, keyinroq bo'lingan.

Qachon bo'lish kerak: jamoa 50+ bo'lganda (Conway's Law), bir qism alohida scale qilish kerak bo'lganda (masalan, ML inference 100x ko'proq resurs kerak), deploy tezligini oshirish uchun (independent deployment), texnologiya farqi kerak bo'lganda (Go service + Python ML).

MONOLITH Auth Module Orders Module Products Module Notifications 1 PostgreSQL (shared) + Sodda debug + Tez iterate - Deploy bog'liq evolve MICROSERVICES API Gateway Auth Service Go + Redis Orders Service Python + PG ML Service Python + GPU Notify Service Node + Kafka Payment Service Java + PG Kafka Event Bus — async communication

Microservices decomposition — qanday bo'linadi?

Domain-Driven Design (DDD) — to'g'ri bo'linish usuli

Texnik chiziqlar bo'yicha emas (database, UI, API) — biznes domeniga ko'ra bo'lish. Bounded Context: "Orders" jarayoni — buyurtma yaratish, kuzatish, bekor qilish. Bu bitta service. "Payments" — to'lov, qaytarish, cheklar. Bu alohida service. Ular bir-birini Kafka event orqali xabardor qiladi, to'g'ridan-to'g'ri DB'ga kirmaydi.

Bo'linish tamoyiliIzohMisol
Business capabilityBiznes funksiyasiga ko'raOrders, Payments, Inventory, Auth
DomainDDD Bounded ContextCustomer, Catalog, Fulfillment
Team ownershipConway's Law — bir jamoa = bir serviceBackend team → core API, ML team → inference
Scale requirementAlohida scale kerak bo'lganML inference → GPU pods, API → CPU pods
Technology fitHar service o'z stack'idaGo auth (tez), Python ML, Node realtime

Service communication patterns

service_communication.py
"""
Microservices o'rtasida kommunikatsiya:
1. Sync REST/gRPC — darhol javob kerak
2. Async Kafka — eventual consistency OK
3. Service mesh — mTLS, retry, circuit breaker
"""

# ── 1. Sync gRPC — internal service call ──
import grpc
from proto import orders_pb2, orders_pb2_grpc

async def get_order_status(order_id: str) -> str:
    """Sync call — javob kerak"""
    async with grpc.aio.insecure_channel("orders-service:50051") as channel:
        stub = orders_pb2_grpc.OrdersStub(channel)
        response = await stub.GetOrder(
            orders_pb2.GetOrderRequest(order_id=order_id),
            timeout=2.0  # 2 sekund timeout
        )
        return response.status

# ── 2. Circuit Breaker ──
from circuitbreaker import circuit

@circuit(failure_threshold=5, recovery_timeout=30)
async def call_payment_service(order_id: str, amount: float):
    """5 marta fail bo'lsa — 30 sek "open" holatda"""
    async with httpx.AsyncClient() as client:
        response = await client.post(
            "http://payment-service/charge",
            json={"order_id": order_id, "amount": amount},
            timeout=5.0
        )
        response.raise_for_status()
        return response.json()

# ── 3. Async via Kafka ──
async def process_order_completed(order: Order):
    """Order tugagach — async fan-out"""
    # Payment service'ni async xabardor qilish
    await kafka.send("order.completed", {
        "order_id": str(order.id),
        "user_id": str(order.user_id),
        "amount": float(order.total),
        "items": [{"product_id": str(i.product_id), "qty": i.quantity}
                  for i in order.items]
    })
    # Har service o'z tezligida o'qiydi:
    # - notification-service: email yuboradi
    # - inventory-service: stokni kamaytiradi
    # - analytics-service: statistika yangilaydi

# ── 4. Saga — distributed transaction ──
class OrderSaga:
    """Kompensatsiya bilan distributed transaction"""
    steps = []
    compensations = []

    async def execute(self, order_data):
        try:
            # Step 1: Payment
            payment = await payment_service.charge(order_data)
            self.compensations.append(
                lambda: payment_service.refund(payment.id)
            )

            # Step 2: Inventory
            reservation = await inventory_service.reserve(order_data)
            self.compensations.append(
                lambda: inventory_service.release(reservation.id)
            )

            # Step 3: Order create
            order = await order_service.create(order_data, payment.id)
            return order

        except Exception as e:
            # Ro'y bergan qadam'gacha kompensatsiya
            for compensate in reversed(self.compensations):
                await compensate()
            raise
21.2

api gateway, service mesh, observability

Microservices infratuzilmasi qanday boshqariladi — service discovery, tracing, health?
javob
Microservices "vertikal" muammolari

Har service uchun alohida: auth middleware, rate limiting, logging, retry logic, TLS — bu cross-cutting concerns. Har service'da takrorlash — nightmare. Yechim: API Gateway (tashqi traffic) va Service Mesh (ichki traffic). Gateway: bitta kirish nuqtasi — routing, auth, rate limit. Service Mesh (Istio, Linkerd): sidecar proxy pattern — har pod'ga proxy, service'lar o'rtasidagi traffic nazorat ostida.

Internet Client API GATEWAY Kong / AWS ALB • Auth (JWT verify) • Rate limiting • SSL termination Auth Svc side- car Orders Svc side- car ML Svc side- car Observability Stack Jaeger / Tempo — Distributed Tracing Har request → span tree → bottleneck topish Prometheus — Metrics Service error rate, latency, requests/sec Loki / ELK — Logs Structured JSON logs, correlation ID Grafana — Dashboards + Alerts

Distributed tracing — request path kuzatish

tracing_setup.py
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.asyncpg import AsyncPGInstrumentor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor

def setup_tracing(app: FastAPI, service_name: str):
    """OpenTelemetry — auto-instrumentation + manual spans"""

    # Exporter — Jaeger yoki Tempo'ga yuborish
    exporter = OTLPSpanExporter(endpoint="http://jaeger:4317")
    provider = TracerProvider()
    provider.add_span_processor(BatchSpanProcessor(exporter))
    trace.set_tracer_provider(provider)

    # Auto-instrumentation — FastAPI, asyncpg, httpx
    FastAPIInstrumentor.instrument_app(app)
    AsyncPGInstrumentor().instrument()
    HTTPXClientInstrumentor().instrument()

# ── Manual span — muhim funksiya uchun ──
tracer = trace.get_tracer(__name__)

async def process_order(order_id: str):
    with tracer.start_as_current_span("process_order") as span:
        span.set_attribute("order.id", order_id)

        # DB span avtomatik qo'shiladi (asyncpg instrumentation)
        order = await db.fetchrow("SELECT * FROM orders WHERE id = $1", order_id)

        with tracer.start_as_current_span("validate_inventory"):
            is_available = await check_inventory(order["product_id"])
            span.set_attribute("inventory.available", is_available)

        if not is_available:
            span.set_attribute("order.cancelled", True)
            span.set_status(trace.Status(trace.StatusCode.ERROR))
            raise OutOfStockError(order["product_id"])

        return order

# Jaeger'da ko'rish:
# /api/v1/orders/create → [5ms]
#   → Auth middleware [1ms]
#   → process_order [12ms]
#     → DB query [4ms]
#     → validate_inventory [6ms]
#       → inventory-service HTTP call [5ms]
Microservices anti-patterns — qochish kerak

Distributed monolith: service'lar alohida deploy qilinadi, lekin bir-biriga sinxron bog'liq. Bitta tushsa — hamma tushadi. Shared database: bir nechta service bitta DB'ga yozsa — o'zaro to'sib qo'yadi, migrate qilish imkonsiz. Nano-services: har funksiya alohida service — network overhead gigantik. Versioning yo'q: service API'sini versiyalamay o'zgartirish — consumer'lar sinadi.