Snowflake算法 (Python版)

OPai

# GenID -> id: string
// 实现一个相对完整的Snowflake算法

Code


import time
import threading

class SnowflakeGenerator:
    def __init__(self, worker_id=0, datacenter_id=0):
        self.worker_id = worker_id
        self.datacenter_id = datacenter_id
        self.sequence = 0
        
        # Epoch is set to Jan 1, 2020 00:00:00 UTC
        self.epoch = 1577836800000
        
        # Bit lengths
        self.worker_id_bits = 5
        self.datacenter_id_bits = 5
        self.max_worker_id = -1 ^ (-1 << self.worker_id_bits)
        self.max_datacenter_id = -1 ^ (-1 << self.datacenter_id_bits)
        self.sequence_bits = 12
        
        # Shifts
        self.worker_shift = self.sequence_bits
        self.datacenter_shift = self.sequence_bits + self.worker_id_bits
        self.timestamp_shift = self.sequence_bits + self.worker_id_bits + self.datacenter_id_bits
        self.sequence_mask = -1 ^ (-1 << self.sequence_bits)
        
        self.last_timestamp = -1
        self.lock = threading.Lock()
    
    def _current_time(self):
        return int(time.time() * 1000)
    
    def _wait_next_millis(self, last_timestamp):
        timestamp = self._current_time()
        while timestamp <= last_timestamp:
            timestamp = self._current_time()
        return timestamp
    
    def generate_id(self):
        with self.lock:
            timestamp = self._current_time()
            
            if timestamp < self.last_timestamp:
                raise Exception("Clock moved backwards. Refusing to generate id for {} milliseconds".format(self.last_timestamp - timestamp))
            
            if timestamp == self.last_timestamp:
                self.sequence = (self.sequence + 1) & self.sequence_mask
                if self.sequence == 0:
                    timestamp = self._wait_next_millis(self.last_timestamp)
            else:
                self.sequence = 0
                
            self.last_timestamp = timestamp
            
            return ((timestamp - self.epoch) << self.timestamp_shift) | \
                   (self.datacenter_id << self.datacenter_shift) | \
                   (self.worker_id << self.worker_shift) | \
                   self.sequence

def GenID():
    generator = SnowflakeGenerator()
    return str(generator.generate_id())